/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.security.authorizer.AclAccessRule;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.Scope;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.security.sasl.SaslServer;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.audit.AuthenticationEvent;
import org.apache.kafka.server.audit.DefaultAuthenticationEvent;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category(value={IntegrationTest.class})
public class MultiTenantAuditLogTest {
    private IntegrationTestHarness testHarness;
    private final String topic = "test.topic";
    private final String consumerGroup = "test.consumer.group";
    private final String logicalClusterId = "lkc-1234";
    private PhysicalCluster physicalCluster;
    private LogicalCluster logicalCluster;
    private LogicalClusterUser user1;
    private LogicalClusterUser user2;

    @Before
    public void setUp() throws Exception {
        MockAuditLogProvider.reset();
    }

    @After
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private void startTestHarness(Properties brokerOverrideProps) throws Exception {
        this.testHarness = new IntegrationTestHarness();
        this.physicalCluster = this.testHarness.start(brokerOverrideProps);
        this.logicalCluster = this.physicalCluster.createLogicalCluster("lkc-1234", 100, 1, 2);
        this.user1 = this.logicalCluster.user(1);
        this.user2 = this.logicalCluster.user(2);
        TestUtils.waitForCondition(this::auditLoggerReady, (long)10000L, (String)"Audit Logger Ready");
    }

    private boolean auditLoggerReady() {
        try {
            if (this.physicalCluster.kafkaCluster().kafkas().isEmpty()) {
                return false;
            }
            for (EmbeddedKafka broker : this.physicalCluster.kafkaCluster().kafkas()) {
                MultiTenantAuthorizer authorizer = (MultiTenantAuthorizer)broker.kafkaServer().authorizer().get();
                if (authorizer.isAuditLogEnabled()) continue;
                return true;
            }
            return true;
        }
        catch (ClassCastException e) {
            return false;
        }
    }

    @Test
    public void testDisabled() throws Throwable {
        this.startTestHarness(this.brokerProps(false));
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        Assert.assertTrue((boolean)MockAuditLogProvider.instance.authorizationLog.isEmpty());
    }

    @Test
    public void testLiteralAcls() throws Throwable {
        this.startTestHarness(this.brokerProps(true));
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        List user1s = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:1")).collect(Collectors.toList());
        List produces = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:1") && e.action().resourceName().equals("test.topic") && e.action().operation().name().equals("Write") && e.authorizePolicy() instanceof AclAccessRule && ((AclAccessRule)e.authorizePolicy()).resourcePattern().name().equals("test.topic")).collect(Collectors.toList());
        Assert.assertEquals((long)1L, (long)produces.size());
        List topicReads = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:2") && e.action().resourceName().equals("test.topic") && e.action().operation().name().equals("Read") && e.authorizePolicy() instanceof AclAccessRule && ((AclAccessRule)e.authorizePolicy()).resourcePattern().name().equals("test.topic")).collect(Collectors.toList());
        Assert.assertFalse((boolean)topicReads.isEmpty());
        List groupReads = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:2") && e.action().resourceName().equals("test.consumer.group") && e.action().operation().name().equals("Read") && e.authorizePolicy() instanceof AclAccessRule && ((AclAccessRule)e.authorizePolicy()).resourcePattern().name().equals("test.consumer.group")).collect(Collectors.toList());
        Assert.assertFalse((boolean)groupReads.isEmpty());
        Assert.assertFalse((boolean)MockAuditLogProvider.instance.authorizationLog.stream().anyMatch(e -> e.requestContext().principal().toString().contains("TenantUser:")));
        List tenantUserEntries = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:1") || e.requestContext().principal().toString().equals("User:2")).collect(Collectors.toList());
        Assert.assertTrue((boolean)tenantUserEntries.stream().allMatch(e -> e.sourceScope().equals((Object)Scope.kafkaClusterScope((String)"lkc-1234"))));
        Assert.assertTrue((boolean)tenantUserEntries.stream().allMatch(e -> e.action().scope().equals((Object)Scope.kafkaClusterScope((String)"lkc-1234"))));
        Assert.assertTrue((boolean)tenantUserEntries.stream().allMatch(e -> e.authorizePolicy() instanceof AclAccessRule && !((AclAccessRule)e.authorizePolicy()).resourcePattern().name().contains("lkc-1234") && !((AclAccessRule)e.authorizePolicy()).aclBinding().entry().principal().startsWith("TenantUser:")));
    }

    @Test
    public void testClusterResource() throws Throwable {
        block2: {
            this.startTestHarness(this.brokerProps(true));
            AdminClient adminClient = this.testHarness.createAdminClient(this.user1);
            try {
                adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
            }
            catch (ExecutionException e2) {
                if (e2.getCause() instanceof ClusterAuthorizationException) break block2;
                throw e2;
            }
        }
        List user1s = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:1")).collect(Collectors.toList());
        List describes = MockAuditLogProvider.instance.authorizationLog.stream().filter(e -> e.requestContext().principal().toString().equals("User:1") && e.action().resourceName().equals("kafka-cluster") && e.action().operation().name().equals("DescribeConfigs") && e.authorizeResult() == AuthorizeResult.DENIED).collect(Collectors.toList());
        Assert.assertEquals((long)1L, (long)describes.size());
    }

    private Properties brokerProps(boolean auditLoggerEnable) {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.max.acls.per.tenant", "100");
        if (auditLoggerEnable) {
            props.put("confluent.security.event.logger.multitenant.enable", "true");
        }
        props.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        return props;
    }

    private void addProducerAcls(LogicalClusterUser user, String topic, PatternType patternType) {
        AclCommand.main((String[])SecurityTestUtils.produceAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix(topic), patternType));
    }

    private void addConsumerAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) {
        AclCommand.main((String[])SecurityTestUtils.consumeAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix(topic), user.withPrefix(consumerGroup), patternType));
    }

    @Test
    public void testAuthenticationEvent() throws Throwable {
        this.startTestHarness(this.brokerProps(true));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.instance;
        MultiTenantPrincipal principal = new MultiTenantPrincipal("0", new TenantMetadata("lkc-12345", "lkc-12345"));
        Assert.assertEquals((Object)"TenantUser", (Object)principal.getPrincipalType());
        Assert.assertTrue((boolean)principal.toString().contains("tenantMetadata"));
        SaslServer server = (SaslServer)Mockito.mock(SaslServer.class);
        SaslAuthenticationContext authenticationContext = new SaslAuthenticationContext(server, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_SSL.name());
        Scope scope = Scope.kafkaClusterScope((String)"ABC123");
        DefaultAuthenticationEvent authenticationEvent = new DefaultAuthenticationEvent((KafkaPrincipal)principal, (AuthenticationContext)authenticationContext, AuditEventStatus.SUCCESS);
        ConfluentAuthenticationEvent confluentAuthenticationEvent = new ConfluentAuthenticationEvent((AuthenticationEvent)authenticationEvent, scope);
        auditLogProvider.logEvent((AuditEvent)confluentAuthenticationEvent);
        ConfluentAuthenticationEvent sanitizedEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assert.assertEquals((Object)"User:0", (Object)((KafkaPrincipal)sanitizedEvent.principal().get()).toString());
        Assert.assertEquals((Object)"User", (Object)((KafkaPrincipal)sanitizedEvent.principal().get()).getPrincipalType());
        Assert.assertFalse((boolean)((KafkaPrincipal)sanitizedEvent.principal().get()).toString().contains("tenantMetadata"));
        Assert.assertTrue((boolean)sanitizedEvent.getScope().toString().contains("kafka-cluster=lkc-12345"));
        Assert.assertFalse((boolean)sanitizedEvent.getScope().toString().contains("ABC123"));
        SslAuthenticationException authenticationException = new SslAuthenticationException("Ssl handshake failed");
        DefaultAuthenticationEvent failureEvent = new DefaultAuthenticationEvent(null, (AuthenticationContext)authenticationContext, AuditEventStatus.UNKNOWN_USER_DENIED, (AuthenticationException)authenticationException);
        ConfluentAuthenticationEvent confluentFailureEvent = new ConfluentAuthenticationEvent((AuthenticationEvent)failureEvent, scope);
        auditLogProvider.logEvent((AuditEvent)confluentFailureEvent);
        sanitizedEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assert.assertFalse((boolean)sanitizedEvent.principal().isPresent());
        Assert.assertTrue((boolean)sanitizedEvent.getScope().toString().contains("ABC123"));
        authenticationException = new SslAuthenticationException("username not specified", AuthenticationErrorInfo.UNKNOWN_USER_ERROR);
        failureEvent = new DefaultAuthenticationEvent(null, (AuthenticationContext)authenticationContext, AuditEventStatus.UNKNOWN_USER_DENIED, (AuthenticationException)authenticationException);
        confluentFailureEvent = new ConfluentAuthenticationEvent((AuthenticationEvent)failureEvent, scope);
        auditLogProvider.logEvent((AuditEvent)confluentFailureEvent);
        sanitizedEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assert.assertFalse((boolean)sanitizedEvent.principal().isPresent());
        Assert.assertTrue((boolean)sanitizedEvent.getScope().toString().contains("ABC123"));
        AuthenticationErrorInfo errorInfo = new AuthenticationErrorInfo(AuditEventStatus.UNAUTHENTICATED, "", "APIKEY123", "lkc123");
        authenticationException = new SaslAuthenticationException("Bad password for user", errorInfo);
        failureEvent = new DefaultAuthenticationEvent(null, (AuthenticationContext)authenticationContext, AuditEventStatus.UNAUTHENTICATED, (AuthenticationException)authenticationException);
        confluentFailureEvent = new ConfluentAuthenticationEvent((AuthenticationEvent)failureEvent, scope);
        auditLogProvider.logEvent((AuditEvent)confluentFailureEvent);
        sanitizedEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assert.assertFalse((boolean)sanitizedEvent.principal().isPresent());
        Assert.assertTrue((boolean)sanitizedEvent.getScope().toString().contains("lkc123"));
        Assert.assertFalse((boolean)sanitizedEvent.getScope().toString().contains("ABC123"));
    }
}

