/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class MultiTenantAuthorizerTest {
    private IntegrationTestHarness testHarness;
    private final String topic = "test.topic";
    private final String consumerGroup = "test.consumer.group";
    private PhysicalCluster physicalCluster;
    private LogicalCluster logicalCluster;
    private LogicalClusterUser user1;
    private LogicalClusterUser user2;

    @Before
    public void setUp() throws Exception {
        this.startTestHarness(this.brokerProps());
    }

    @After
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private void startTestHarness(Properties brokerOverrideProps) throws Exception {
        this.testHarness = new IntegrationTestHarness();
        this.physicalCluster = this.testHarness.start(brokerOverrideProps);
        this.logicalCluster = this.physicalCluster.createLogicalCluster("tenantA", 100, 1, 2);
        this.user1 = this.logicalCluster.user(1);
        this.user2 = this.logicalCluster.user(2);
    }

    @Test
    public void testLiteralAcls() throws Throwable {
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
        SecurityTestUtils.verifyConfluentLicense(this.physicalCluster.kafkaCluster(), null);
    }

    @Test
    public void testPrefixedAcls() throws Throwable {
        this.addProducerAcls(this.user1, "test", PatternType.PREFIXED);
        this.addConsumerAcls(this.user2, "test", "test", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @Test
    public void testWildcardAcls() throws Throwable {
        AclCommand.main((String[])SecurityTestUtils.produceAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), "*", PatternType.LITERAL));
        AclCommand.main((String[])SecurityTestUtils.consumeAclArgs(this.testHarness.zkConnect(), this.user2.prefixedKafkaPrincipal(), "*", "*", PatternType.LITERAL));
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @Test
    public void testSuperUsers() throws Throwable {
        this.testHarness.produceConsume(this.logicalCluster.adminUser(), this.logicalCluster.adminUser(), "test.topic", "test.consumer.group", 0);
    }

    @Test
    public void testAclUpdateInZooKeeper() throws Throwable {
        String topic = "test.topic";
        String consumerGroup = "test.group";
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix(topic), 3, 1);
        try (KafkaConsumer<String, String> consumer = this.testHarness.createConsumer(this.user1, consumerGroup, SecurityProtocol.SASL_PLAINTEXT);){
            Assert.assertFalse((boolean)this.checkAuthorized(consumer, topic));
            AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix(topic), AclOperation.DESCRIBE, PatternType.LITERAL));
            TestUtils.waitForCondition(() -> this.checkAuthorized(consumer, topic), (String)"ACL not applied within timeout");
        }
    }

    @Test
    public void testLogicalClusterScope() throws Throwable {
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user1, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user1, "test.topic", "test.consumer.group", 0);
        int userId = this.user1.userMetadata.userId();
        LogicalCluster cluster2 = this.physicalCluster.createLogicalCluster("anotherCluster", 100, userId);
        LogicalClusterUser cluster2user1 = cluster2.user(userId);
        this.verifyTopicAuthorizationFailure(cluster2user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(cluster2user1, "test.topic", "somegroup");
        this.addProducerAcls(cluster2user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(cluster2user1, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(cluster2user1, cluster2user1, "test.topic", "test.consumer.group", 0);
    }

    @Test
    public void testLiteralAclsUsingAdminClient() throws Throwable {
        this.addProducerAclsUsingAdminClient(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAclsUsingAdminClient(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @Test
    public void testPrefixAclsUsingAdminClient() throws Throwable {
        this.addProducerAclsUsingAdminClient(this.user1, "test.", PatternType.PREFIXED);
        this.addConsumerAclsUsingAdminClient(this.user2, "test.", "test.", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @Test
    public void testWildcardAclsUsingAdminClient() throws Throwable {
        this.addProducerAclsUsingAdminClient(this.user1, "*", PatternType.LITERAL);
        this.addConsumerAclsUsingAdminClient(this.user2, "*", "*", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @Test
    public void testAclCreateDescribeDeleteUsingAdminClient() throws Throwable {
        AclCommand.main((String[])SecurityTestUtils.topicBrokerReadAclArgs(this.testHarness.zkConnect(), PhysicalCluster.BROKER_PRINCIPAL));
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        LogicalCluster logicalClusterB = this.physicalCluster.createLogicalCluster("tenantB", 100, 11);
        LogicalClusterUser userB1 = logicalClusterB.user(11);
        AdminClient adminClientA = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        AdminClient adminClientB = this.testHarness.createAdminClient(logicalClusterB.adminUser());
        ConsumerAcls aclsA = new ConsumerAcls(adminClientA, true);
        ConsumerAcls aclsB = new ConsumerAcls(adminClientB, true);
        aclsA.addAcls(this.user1, "test1.topic", "test1.group", PatternType.LITERAL);
        aclsA.addAcls(this.user2, "prefixed.test2", "prefixed.test2", PatternType.PREFIXED);
        aclsB.addAcls(userB1, "*", "*", PatternType.LITERAL);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_test1.topic", 1, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_prefixed.test2.topic", 2, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantB_test1.topic", 1, 1);
        KafkaConsumer<String, String> consumer1 = this.testHarness.createConsumer(this.user1, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> consumer2 = this.testHarness.createConsumer(this.user2, "test2.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> consumerB1 = this.testHarness.createConsumer(userB1, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        Assert.assertTrue((boolean)this.checkAuthorized(consumer1, "test1.topic"));
        Assert.assertFalse((boolean)this.checkAuthorized(consumer2, "test1.topic"));
        Assert.assertTrue((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
        Assert.assertFalse((boolean)this.checkAuthorized(consumer1, "prefixed.test2.topic"));
        Assert.assertTrue((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
        ConsumerAcls baseAcls = new ConsumerAcls(superAdminClient, false);
        baseAcls.verifyAllAcls(null, PatternType.ANY);
        baseAcls.verifyAllAcls(null, PatternType.MATCH);
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, new String[]{"*", "tenantA_test1.topic"});
        baseAcls.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, new String[]{"tenantA_prefixed.test2", "tenantB_"});
        baseAcls.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, new String[]{"*", "tenantA_test1.topic", "tenantA_test1.group", "tenantA_kafka-cluster", "tenantB_kafka-cluster"});
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, new String[]{"*", "tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_"});
        aclsA.verifyAllAcls(null, PatternType.ANY);
        aclsA.verifyAllAcls(null, PatternType.MATCH);
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, new String[]{"test1.topic"});
        aclsA.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, new String[]{"prefixed.test2"});
        aclsA.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, new String[]{"test1.topic", "test1.group", "kafka-cluster"});
        aclsA.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, this.user1, new String[]{"test1.topic", "test1.group", "kafka-cluster"});
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, new String[]{"test1.topic", "prefixed.test2"});
        aclsB.verifyAllAcls(null, PatternType.ANY);
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.LITERAL, userB1, new String[]{"*"});
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.ANY, userB1, new String[]{"*"});
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.PREFIXED, userB1, new String[0]);
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, userB1, new String[]{"*"});
        aclsB.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, new String[]{"*", "kafka-cluster"});
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.PREFIXED, userB1, new String[0]);
        aclsB.verifyAcls(ResourceType.ANY, "kafka-cluster", PatternType.LITERAL, userB1, new String[]{"kafka-cluster"});
        baseAcls.verifyAcls(ResourceType.ANY, "tenantA_prefixed.test2.topic", PatternType.MATCH, null, new String[]{"tenantA_prefixed.test2", "*"});
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, new String[]{"tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_", "*"});
        aclsA.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, null, new String[]{"prefixed.test2"});
        aclsA.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, this.user2, new String[]{"prefixed.test2"});
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, this.user1, new String[]{"test1.topic"});
        aclsB.verifyAcls(ResourceType.TOPIC, "test", PatternType.MATCH, userB1, new String[]{"*"});
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.MATCH, userB1, new String[]{"*"});
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, new String[]{"*"});
        Assert.assertTrue((boolean)this.checkAuthorized(consumer1, "test1.topic"));
        aclsA.deleteAcls(ResourceType.TOPIC, "test1.topic", PatternType.LITERAL, null);
        Assert.assertFalse((boolean)this.checkAuthorized(consumer1, "test1.topic"));
        aclsA.verifyAllAcls(null, PatternType.ANY);
        Assert.assertTrue((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
        aclsA.deleteAcls(ResourceType.TOPIC, "prefixed.test2.topic", PatternType.MATCH, this.user2);
        Assert.assertFalse((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
        aclsA.verifyAllAcls(null, PatternType.ANY);
        Assert.assertTrue((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
        aclsB.deleteAcls(ResourceType.TOPIC, null, PatternType.MATCH, userB1);
        Assert.assertFalse((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
        aclsB.verifyAllAcls(null, PatternType.MATCH);
    }

    @Test
    public void testAclLimit() throws Throwable {
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        Function<String, AclBinding> topicAcl = topic -> new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), new AccessControlEntry(this.user1.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            int aclCount = ((Collection)adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get()).size();
            for (int i = 0; i < 100 - aclCount; ++i) {
                adminClient.createAcls(Collections.singleton(topicAcl.apply("topic" + i))).all().get();
            }
            try {
                adminClient.createAcls(Collections.singleton(topicAcl.apply("othertopic"))).all().get();
            }
            catch (ExecutionException e) {
                Assert.assertEquals(InvalidRequestException.class, e.getCause().getClass());
            }
        }
        LogicalCluster logicalCluster2 = this.physicalCluster.createLogicalCluster("anotherCluster", 100, new Integer[0]);
        try (AdminClient adminClient = this.testHarness.createAdminClient(logicalCluster2.adminUser());){
            adminClient.createAcls(Collections.singleton(topicAcl.apply("sometopic"))).all().get();
        }
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @Test
    public void testAuthorizerDisabledUsingAclLimit() throws Throwable {
        this.testHarness.shutdown();
        Properties brokerProps = this.brokerProps();
        brokerProps.put("confluent.max.acls.per.tenant", "0");
        this.startTestHarness(brokerProps);
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(this.user1.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBindingFilter topicFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            try {
                adminClient.createAcls(Collections.singleton(topicAcl)).all().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
            }
            try {
                adminClient.describeAcls(topicFilter).values().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
            }
            try {
                adminClient.deleteAcls(Collections.singleton(topicFilter)).all().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
            }
        }
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    private void verifyAclsDisabledException(ExecutionException e) {
        Throwable cause = e.getCause();
        Assert.assertTrue((String)("Unexpected exception: " + cause), (boolean)(cause instanceof InvalidRequestException));
    }

    @Test
    public void testInvalidAcl() throws Throwable {
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            List<String> invalidPrincipals = Arrays.asList("", "userWithoutPrincipalType");
            invalidPrincipals.forEach(principal -> {
                AclBinding acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.WRITE, AclPermissionType.ALLOW));
                try {
                    adminClient.createAcls(Collections.singleton(acl)).all().get();
                    Assert.fail((String)"createAcls didn't fail with invalid principal");
                }
                catch (Exception e) {
                    Assert.assertTrue((String)("Invalid exception: " + e), (e instanceof ExecutionException && e.getCause() instanceof InvalidRequestException ? 1 : 0) != 0);
                }
            });
        }
    }

    @Test
    public void testAuthorizeByResourceType() throws Throwable {
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix("test.topic"), 1, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(this.user1);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.topic"), AclOperation.DESCRIBE, PatternType.LITERAL));
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "test.topic"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED));
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "prefix.topic1"), (String)"ACL not applied within timeout");
        AclCommand.main((String[])SecurityTestUtils.addTopicDenyAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED));
        TestUtils.waitForCondition(() -> !this.checkDescribeAuthorized(adminClient, "prefix.topic2"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user2.prefixedKafkaPrincipal(), "*", AclOperation.ALL, PatternType.LITERAL));
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("another"), AclOperation.ALL, PatternType.PREFIXED));
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "another.topic"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", TopicAuthorizationException.class);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.topic"), AclOperation.WRITE, PatternType.LITERAL));
        this.verifyIdempotentProducer(this.user1, "test.topic", null);
    }

    private Properties brokerProps() {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.max.acls.per.tenant", "100");
        return props;
    }

    private void addProducerAcls(LogicalClusterUser user, String topic, PatternType patternType) {
        AclCommand.main((String[])SecurityTestUtils.produceAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix(topic), patternType));
    }

    private void addConsumerAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) {
        AclCommand.main((String[])SecurityTestUtils.consumeAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix(topic), user.withPrefix(consumerGroup), patternType));
    }

    private void addProducerAclsUsingAdminClient(LogicalClusterUser user, String topic, PatternType patternType) throws Exception {
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            this.addProducerAcls(adminClient, user, topic, patternType);
        }
    }

    private void addProducerAcls(AdminClient adminClient, LogicalClusterUser user, String topic, PatternType patternType) throws Exception {
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, patternType), new AccessControlEntry(user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        adminClient.createAcls(Arrays.asList(topicAcl, clusterAcl)).all().get();
    }

    private void addConsumerAclsUsingAdminClient(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            this.addConsumerAcls(adminClient, user, topic, consumerGroup, patternType);
        }
    }

    private void addConsumerAcls(AdminClient adminClient, LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
        List<AclBinding> acls = this.consumerAcls(user, topic, consumerGroup, patternType);
        adminClient.createAcls(acls).all().get();
    }

    private List<AclBinding> consumerAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) {
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, patternType), new AccessControlEntry(user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding consumerGroupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, consumerGroup, patternType), new AccessControlEntry(user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        return Arrays.asList(topicAcl, consumerGroupAcl, clusterAcl);
    }

    private void verifyTopicAuthorizationFailure(LogicalClusterUser user, String topic) {
        try (KafkaProducer<String, String> producer = this.testHarness.createProducer(user, SecurityProtocol.SASL_PLAINTEXT);){
            try {
                producer.partitionsFor(topic);
                Assert.fail((String)"Authorization should have failed");
            }
            catch (AuthorizationException authorizationException) {
                // empty catch block
            }
        }
    }

    private void verifyConsumerGroupAuthorizationFailure(LogicalClusterUser user, String topic, String group) {
        try (KafkaConsumer<String, String> consumer = this.testHarness.createConsumer(user, group, SecurityProtocol.SASL_PLAINTEXT);){
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(Duration.ofSeconds(5L));
            Assert.fail((String)"Authorization should have failed");
        }
        catch (AuthorizationException authorizationException) {
            // empty catch block
        }
    }

    private boolean checkAuthorized(KafkaConsumer<?, ?> consumer, String topic) {
        try {
            consumer.partitionsFor(topic).size();
            return true;
        }
        catch (AuthorizationException e) {
            return false;
        }
    }

    private boolean checkDescribeAuthorized(AdminClient adminClient, String topic) {
        try {
            try {
                adminClient.describeTopics(Collections.singleton(topic)).all().get(15L, TimeUnit.SECONDS);
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
            return true;
        }
        catch (AuthorizationException e) {
            return false;
        }
        catch (UnknownTopicOrPartitionException e) {
            return true;
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    private void verifyIdempotentProducer(LogicalClusterUser user, String topic, Class<? extends AuthorizationException> exceptionClass) throws Throwable {
        Properties producerProps = KafkaTestUtils.producerProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.user1.saslJaasConfig());
        producerProps.setProperty("enable.idempotence", "true");
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            if (exceptionClass != null) {
                Assert.assertThrows(exceptionClass, () -> KafkaTestUtils.sendRecords((KafkaProducer<String, String>)producer, topic, 0, 1));
            } else {
                KafkaTestUtils.sendRecords((KafkaProducer<String, String>)producer, topic, 0, 1);
            }
        }
    }

    private class ConsumerAcls {
        private final AdminClient adminClient;
        private final Set<AclBinding> acls;
        private final boolean tenantOnly;

        ConsumerAcls(AdminClient adminClient, boolean tenantOnly) throws Exception {
            this.adminClient = adminClient;
            this.tenantOnly = tenantOnly;
            this.acls = new HashSet<AclBinding>();
            this.acls.addAll(this.describeAcls(null, PatternType.ANY));
        }

        private void addAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
            List consumerAcls = MultiTenantAuthorizerTest.this.consumerAcls(user, topic, consumerGroup, patternType);
            this.adminClient.createAcls((Collection)consumerAcls).all().get();
            this.acls.addAll(consumerAcls);
        }

        private void deleteAcls(ResourceType resourceType, String resourceName, PatternType patternType, LogicalClusterUser user) throws Exception {
            String principal = user == null ? null : user.unprefixedKafkaPrincipal().toString();
            Collection deletedAcls = (Collection)this.adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), new AccessControlEntryFilter(principal, null, AclOperation.ANY, AclPermissionType.ANY)))).all().get();
            this.acls.removeAll(deletedAcls);
        }

        private Set<AclBinding> describeAcls(String resourceName, PatternType patternType) throws Exception {
            Collection describedAcls = (Collection)this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, resourceName, patternType), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
            return new HashSet<AclBinding>(describedAcls);
        }

        private void verifyAllAcls(String resourceName, PatternType patternType) throws Exception {
            Set<AclBinding> describedAcls = this.describeAcls(resourceName, patternType);
            Assert.assertEquals(this.acls, describedAcls);
        }

        private void verifyAcls(ResourceType resourceType, String resourceName, PatternType patternType, LogicalClusterUser user, String ... expectedResources) throws Exception {
            String principal = user == null ? null : user.unprefixedKafkaPrincipal().toString();
            Collection acls = (Collection)this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), new AccessControlEntryFilter(principal, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
            Set aclResources = acls.stream().map(acl -> acl.pattern().name()).collect(Collectors.toSet());
            Assert.assertEquals((Object)Utils.mkSet((Object[])expectedResources), aclResources);
            if (this.tenantOnly) {
                acls.forEach(acl -> Assert.assertFalse((String)("Unexpected acl " + acl), (boolean)acl.entry().principal().contains(PhysicalCluster.BROKER_PRINCIPAL.getName())));
            }
        }
    }
}

