package io.confluent.kafka.server.plugins.auth.oauth;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/oauth/OAuthSaslAuthenticatorTest.class */
public class OAuthSaslAuthenticatorTest {
    private Selector selector;
    private NioEchoServer server;
    private OAuthUtils.JwsContainer jwsContainer;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private String allowedCluster = "audi";
    private String[] allowedClusters = {this.allowedCluster};
    private MockTime mockTime;
    private CredentialCache credentialCache;

    @Before
    public void setup() throws Exception {
        LoginManager.closeAll();
        CertStores certStores = new CertStores(true, "localhost");
        CertStores certStores2 = new CertStores(false, "localhost");
        this.saslServerConfigs = certStores.getTrustingConfig(certStores2);
        this.saslClientConfigs = certStores2.getTrustingConfig(certStores);
        this.credentialCache = new CredentialCache();
        this.mockTime = new MockTime(1L);
    }

    @After
    public void teardown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
    }

    @Test
    public void testValidTokenAuthorizes() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = OAuthUtils.setUpJws(100000, "Confluent", "Confluent", this.allowedClusters);
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(1, 0);
    }

    @Test
    public void testInvalidIssuerFailsAuthorization() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = OAuthUtils.setUpJws(100000, "SomebodyElse", "Confluent", this.allowedClusters);
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testPublicKeyFailsAuthorization() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = OAuthUtils.setUpJws(100000, "SomebodyElse", "Confluent", this.allowedClusters);
        OAuthUtils.writePemFile(this.jwsContainer.getPublicKeyFile(), OAuthUtils.generateKeyPair().getPublic());
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    private void configureMechanisms(String str, List<String> list) {
        this.saslClientConfigs.put("sasl.mechanism", str);
        this.saslClientConfigs.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        this.saslClientConfigs.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required token=\"" + this.jwsContainer.getJwsToken() + "\" cluster=\"" + this.allowedCluster + "\";");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", list);
        this.saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class", OAuthBearerServerLoginCallbackHandler.class.getName());
        this.saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class", OAuthBearerValidatorCallbackHandler.class.getName());
        this.saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\";");
        TestJaasConfig.createConfiguration(str, list);
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        createClientConnection(securityProtocol, str);
        NetworkTestUtils.checkClientConnection(this.selector, str, 100, 10);
        this.selector.close();
        this.selector = null;
    }

    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String str) throws Exception {
        createClientConnection(securityProtocol, str);
        NetworkTestUtils.waitForChannelClose(this.selector, str, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
        this.selector = null;
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        createSelector(securityProtocol, this.saslClientConfigs);
        this.selector.connect(str, new InetSocketAddress("localhost", this.server.port()), 4096, 4096);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> map) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        this.selector = new Selector(25000L, new Metrics(), this.mockTime, "MetricGroup", ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, new TestSecurityConfig(map), (ListenerName) null, (String) this.saslClientConfigs.get("sasl.mechanism"), true), new LogContext());
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, this.mockTime);
    }
}
