package org.apache.kafka.common.security.authenticator;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DelayableValidateCallbackHandler;
import org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest;
import org.apache.kafka.common.security.authenticator.TestDigestLoginModule;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorAsyncTest.class */
public class SaslAuthenticatorAsyncTest {
    private static final int BUFFER_SIZE = 4096;
    private static final long CONNECTIONS_MAX_REAUTH_MS_VALUE = 100000;
    private static Time time = Time.SYSTEM;
    private NioEchoServer server;
    private CertStores serverCertStores;
    private CertStores clientCertStores;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private CredentialCache credentialCache;

    @BeforeEach
    public void setup() throws Exception {
        LoginManager.closeAll();
        time = Time.SYSTEM;
        this.serverCertStores = new CertStores(true, "localhost");
        this.clientCertStores = new CertStores(false, "localhost");
        this.saslServerConfigs = this.serverCertStores.getTrustingConfig(this.clientCertStores);
        this.saslClientConfigs = this.clientCertStores.getTrustingConfig(this.serverCertStores);
        this.saslServerConfigs.put("ssl.engine.factory.class", DefaultSslEngineFactory.class);
        this.saslClientConfigs.put("ssl.engine.factory.class", DefaultSslEngineFactory.class);
        this.credentialCache = new CredentialCache();
        SaslAuthenticatorTest.TestLogin.loginCount.set(0);
    }

    private TestJaasConfig configureMechanisms(String str, List<String> list) {
        this.saslClientConfigs.put("sasl.mechanism", str);
        this.saslServerConfigs.put("sasl.enabled.mechanisms", list);
        this.saslServerConfigs.put("connections.max.reauth.ms", Long.valueOf(CONNECTIONS_MAX_REAUTH_MS_VALUE));
        if (list.contains("DIGEST-MD5")) {
            this.saslServerConfigs.put("digest-md5.sasl.server.callback.handler.class", TestDigestLoginModule.DigestServerCallbackHandler.class.getName());
        }
        return TestJaasConfig.createConfiguration(str, list);
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, time);
    }

    private Selector createClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        Selector createSelector = createSelector(securityProtocol, this.saslClientConfigs);
        createSelector.connect(str, new InetSocketAddress("localhost", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
        return createSelector;
    }

    private Selector createSelector(SecurityProtocol securityProtocol, Map<String, Object> map) {
        return NetworkTestUtils.createSelector(ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, new TestSecurityConfig(map), (ListenerName) null, (String) this.saslClientConfigs.get("sasl.mechanism"), time, true, new LogContext()), time);
    }

    @Test
    public void testAsyncAuthInitialAuthTimeouts() throws Exception {
        int i = 2;
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"));
        this.saslServerConfigs.put(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("OAUTHBEARER") + "sasl.server.callback.handler.class", DelayableValidateCallbackHandler.class);
        this.saslServerConfigs.put("sasl.server.authn.async.enable", true);
        this.saslServerConfigs.put("sasl.server.authn.async.timeout.ms", 250L);
        this.saslServerConfigs.put("delayMs", "500");
        this.saslServerConfigs.put("numDelayConnections", Integer.toString(2));
        this.saslServerConfigs.put("interruptBehavior", DelayableValidateCallbackHandler.InterruptBehavior.error.toString());
        this.server = createEchoServer(securityProtocol);
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 2 + 4; i2++) {
            int i3 = i2;
            if (i2 == 2) {
                TestUtils.waitForCondition(() -> {
                    return NetworkTestUtils.numAsyncInFlightTasks(this.server.selector()) == i;
                }, "All delayed tasks are inFlight");
            }
            Thread thread = new Thread(() -> {
                Selector selector = null;
                try {
                    try {
                        String num = Integer.toString(i3);
                        Selector createClientConnection = createClientConnection(securityProtocol, num);
                        if (i3 < i) {
                            NetworkTestUtils.waitForChannelClose(createClientConnection, num, ChannelState.State.AUTHENTICATION_FAILED);
                        } else {
                            NetworkTestUtils.checkClientConnection(createClientConnection, num, 100, 10);
                        }
                        createClientConnection.close();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th) {
                    selector.close();
                    throw th;
                }
            });
            arrayList.add(thread);
            thread.start();
        }
        arrayList.forEach(thread2 -> {
            try {
                thread2.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        this.server.verifyAuthenticationMetrics(4, 2);
        this.server.verifyAuthenticationMetricsForMechanism(4, 2, "OAUTHBEARER");
        this.server.waitForMetrics("timed-out-authentication", 2, EnumSet.of(NioEchoServer.MetricType.TOTAL, NioEchoServer.MetricType.RATE));
    }
}
