/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.auth.oauth;

import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.audit.AuthenticationEvent;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class OAuthSaslAuthenticatorTest {
    private final Path tempDir = TestUtils.tempDirectory().toPath();
    private Selector selector;
    private NioEchoServer server;
    private OAuthUtils.JwsContainer jwsContainer;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private String allowedCluster = Utils.LC_META_ABC.logicalClusterId();
    private String orgId = Utils.LC_META_ABC.organizationId();
    private PhysicalClusterMetadata metadata;
    private Map<String, Object> configs;
    private String brokerUUID;
    private TestAuditLogProvider auditLogProvider = new TestAuditLogProvider();
    private Time time = null;
    private CredentialCache credentialCache;

    @BeforeEach
    public void setUp() throws Exception {
        this.time = Time.SYSTEM;
        LoginManager.closeAll();
        CertStores serverCertStores = new CertStores(true, "localhost");
        CertStores clientCertStores = new CertStores(false, "localhost");
        this.saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
        this.saslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
        this.credentialCache = new CredentialCache();
        this.setUpMetadata();
    }

    private void setUpMetadata() throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        this.configs = new HashMap<String, Object>();
        this.configs.put("broker.session.uuid", this.brokerUUID);
        this.saslServerConfigs.put("broker.session.uuid", this.brokerUUID);
        this.configs.put("multitenant.metadata.dir", this.tempDir.toRealPath(new LinkOption[0]).toString());
        this.metadata = Utils.initiatePhysicalClusterMetadata(this.configs);
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> this.metadata.metadata(Utils.LC_META_ABC.logicalClusterId()) != null, (String)"Expected metadata of new logical cluster to be present in metadata cache");
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
        this.metadata.close(this.brokerUUID);
    }

    @Test
    public void testValidTokenAuthorizes() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "Confluent", "Confluent", this.orgId).build();
        this.configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(1, 0);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        Assertions.assertEquals((Object)"Confluent", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
    }

    @Test
    public void testInvalidIssuerFailsAuthorization() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "SomebodyElse", "Confluent", this.orgId).build();
        this.configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
    }

    @Test
    public void testPublicKeyFailsAuthorization() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "SomebodyElse", "Confluent", this.orgId).build();
        OAuthUtils.writePemFile(this.jwsContainer.getPublicKeyFile(), OAuthUtils.generateKeyPair().getPublic());
        this.configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
    }

    @Test
    public void testTokenWhenLogicalClusterIsNotHostedOnBroker() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String[] allowedClusters = new String[]{"cp10"};
        this.jwsContainer = new OAuthUtils.Builder(100000, "Confluent", "Confluent", this.orgId).build();
        this.configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), allowedClusters[0]);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        AuthenticationErrorInfo errorInfo = ((AuthenticationException)authenticationEvent.authenticationException().get()).errorInfo();
        Assertions.assertEquals((Object)"Confluent", (Object)errorInfo.identifier());
        Assertions.assertEquals((Object)allowedClusters[0], errorInfo.saslExtensions().get("logicalCluster"));
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("logical cluster cp10 is not hosted on this broker"));
    }

    private void configureMechanisms(String clientMechanism, List<String> serverMechanisms, String clusterToConnect) {
        SecurityTestUtils.attachMechanisms(this.saslClientConfigs, clientMechanism, this.jwsContainer, clusterToConnect);
        SecurityTestUtils.attachServerOAuthConfigs(this.saslServerConfigs, serverMechanisms, "listener.name.sasl_ssl", this.jwsContainer);
        TestJaasConfig.createConfiguration((String)clientMechanism, serverMechanisms);
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        NetworkTestUtils.checkClientConnection((Selector)this.selector, (String)node, (int)100, (int)10);
        this.selector.close();
        this.selector = null;
    }

    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        NetworkTestUtils.waitForChannelClose((Selector)this.selector, (String)node, (ChannelState.State)ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
        this.selector = null;
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        String saslMechanism = (String)this.saslClientConfigs.get("sasl.mechanism");
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)securityProtocol, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)new TestSecurityConfig(clientConfigs), (ListenerName)null, (String)saslMechanism, (Time)this.time, (boolean)true, (LogContext)new LogContext());
        this.selector = new Selector(25000L, new Metrics(), this.time, "MetricGroup", channelBuilder, new LogContext());
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer((ListenerName)ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol), (SecurityProtocol)securityProtocol, (AbstractConfig)new TestSecurityConfig(this.saslServerConfigs), (CredentialCache)this.credentialCache, (Time)this.time, Optional.of(this.auditLogProvider));
    }

    private static class TestAuditLogProvider
    implements AuditLogProvider {
        public final List<AuthenticationEvent> authenticationEvents = new ArrayList<AuthenticationEvent>();

        private TestAuditLogProvider() {
        }

        public boolean providerConfigured(Map<String, ?> configs) {
            return false;
        }

        public void logEvent(AuditEvent auditEvent) {
            this.authenticationEvents.add((AuthenticationEvent)auditEvent);
        }

        public void setSanitizer(UnaryOperator<AuditEvent> sanitizer) {
        }

        public boolean usesMetadataFromThisKafkaCluster() {
            return false;
        }

        public void close(String brokerSessionUuid) throws Exception {
            this.close();
        }

        public void close() throws Exception {
        }

        public Set<String> reconfigurableConfigs() {
            return null;
        }

        public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
        }

        public void reconfigure(Map<String, ?> configs) {
        }

        public void configure(Map<String, ?> configs) {
        }
    }
}

