/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category(value={IntegrationTest.class})
public class MultiTenantKafkaIntegrationTest {
    private static final int BROKER_COUNT = 2;
    private final ListenerName externalListenerName = new ListenerName("external");
    private final String externalListenerPrefix = this.externalListenerName.configPrefix();
    private IntegrationTestHarness testHarness;
    private LogicalCluster logicalCluster1;
    private LogicalCluster logicalCluster2;
    private PhysicalCluster physicalCluster;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public void setUp() {
        this.setUp(2, Collections.emptyList());
    }

    public void setUp(int brokerCount, List<String> brokerRacks) {
        this.setUp(brokerCount, brokerRacks, Collections.emptyList());
    }

    public void setUp(int brokerCount, List<String> brokerRacks, List<String> brokerCells) {
        this.testHarness = new IntegrationTestHarness(brokerCount, brokerRacks, brokerCells);
    }

    private void createPhysicalAndLogicalClusters() {
        this.createPhysicalAndLogicalClusters(this.brokerProps());
    }

    private void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        this.physicalCluster = this.testHarness.start(brokerProperties);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("tenantA", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("tenantB", 200, 9, 21, 22);
    }

    @After
    public void tearDown() {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() {
        Properties props = new Properties();
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.max.acls.per.tenant", "100");
        props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), "true");
        props.put("multitenant.metadata.class", "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        props.put("multitenant.metadata.dir", this.tempFolder.getRoot().getAbsolutePath());
        props.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        props.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        props.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        return props;
    }

    @Test
    public void testMultiTenantMetadataInstances() {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        List<String> brokerSessionUuids = this.physicalCluster.kafkaCluster().brokers().stream().map(broker -> {
            Object cfgVal = broker.config().values().get(KafkaConfig.BrokerSessionUuidProp());
            return cfgVal == null ? "" : cfgVal.toString();
        }).distinct().collect(Collectors.toList());
        Assert.assertEquals((String)"Expect each broker to have unique session UUID.", (long)2L, (long)brokerSessionUuids.size());
        brokerSessionUuids.forEach(uuid -> Assert.assertNotNull((String)("Expect valid instance of PhysicalClusterMetadata for broker session UUID " + uuid), (Object)PhysicalClusterMetadata.getInstance((String)uuid)));
    }

    @Test
    public void testProduceConsume() throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0);
        this.testHarness.produceConsume(this.logicalCluster2.user(21), this.logicalCluster2.user(22), "testtopic", "group1", 1000);
    }

    @Test
    public void testInvalidTopicCreation() {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(".", 2, 1)));
        Throwable exception = Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)createTopicsResult.all().get();
        });
        Throwable cause = exception.getCause();
        Assert.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assert.assertEquals((Object)"Invalid topic name specified.", (Object)cause.getMessage());
    }

    @Test
    public void testTopicCreationWithNoRacks() throws Exception {
        this.validateRackAwareAssignment(Collections.emptyList());
    }

    @Test
    public void testRackAwareTopicCreation() throws Exception {
        this.validateRackAwareAssignment(Arrays.asList("0", "0", "2", "1", "1", "2"));
    }

    @Test
    public void testRackAwareTopicCreationWithOneRack() throws Exception {
        this.validateRackAwareAssignment(Arrays.asList("0", "0", "0", "0", "0", "0"));
    }

    @Test
    public void testRackAwareTopicCreationUnbalancedRacks() throws Exception {
        this.validateRackAwareAssignment(Arrays.asList("0", "0", "2", "1", "1", "0"));
    }

    private void validateRackAwareAssignment(List<String> brokerRacks) throws InterruptedException, ExecutionException {
        this.setUp(6, brokerRacks);
        String topicName = "test_topic";
        short replicationFactor = 3;
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (Object)replicationFactor);
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 6, replicationFactor)));
        createTopicsResult.all().get();
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.all().get();
        Assert.assertTrue((String)topicDescriptions.toString(), (boolean)topicDescriptions.containsKey(topicName));
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
            this.assertRackDistribution(replicationFactor, partitionInfo);
        }
    }

    @Test
    public void testCellAwareTopicCreationWithoutRack() throws Exception {
        this.validateCellAwareAssignment(Collections.emptyList(), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithOneRack() throws Exception {
        this.validateCellAwareAssignment(Arrays.asList("0", "0", "0", "0", "0", "0"), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRack() throws Exception {
        this.validateCellAwareAssignment(Arrays.asList("0", "1", "2", "0", "1", "2"), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRackRandomlyDistributed() throws Exception {
        this.validateCellAwareAssignment(this.shuffle(Arrays.asList("0", "1", "2", "0", "1", "2")), this.shuffle(Arrays.asList("0", "0", "0", "1", "1", "1")));
    }

    private <T> List<T> shuffle(List<T> list) {
        Collections.shuffle(list);
        return list;
    }

    private void validateCellAwareAssignment(List<String> brokerRacks, List<String> brokerCells) throws InterruptedException, ExecutionException {
        this.setUp(6, brokerRacks, brokerCells);
        String topicName = "test_topic";
        short replicationFactor = 3;
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (Object)replicationFactor);
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 6, replicationFactor)));
        createTopicsResult.all().get();
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.all().get();
        Assert.assertTrue((String)topicDescriptions.toString(), (boolean)topicDescriptions.containsKey(topicName));
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
            this.assertCellDistribution(brokerCells, partitionInfo);
            this.assertRackDistribution(replicationFactor, partitionInfo);
        }
    }

    private void assertCellDistribution(List<String> brokerCells, TopicPartitionInfo partitionInfo) {
        Set cells = partitionInfo.replicas().stream().map(replica -> (String)brokerCells.get(replica.id())).collect(Collectors.toSet());
        Assert.assertEquals((long)1L, (long)cells.size());
    }

    private void assertRackDistribution(short replicationFactor, TopicPartitionInfo partitionInfo) {
        HashMap<String, Integer> partitionRacks = new HashMap<String, Integer>();
        for (Node node : partitionInfo.replicas()) {
            partitionRacks.compute(node.rack(), (rack, count) -> count == null ? 1 : count + 1);
        }
        int maxPerRack = (Integer)partitionRacks.values().stream().max(Comparator.naturalOrder()).get();
        int minPerRack = (Integer)partitionRacks.values().stream().min(Comparator.naturalOrder()).get();
        Assert.assertEquals((String)((Object)partitionRacks).toString(), (long)(replicationFactor / partitionRacks.size()), (long)minPerRack);
        Assert.assertEquals((String)((Object)partitionRacks).toString(), (long)(replicationFactor % partitionRacks.size() == 0 ? (long)minPerRack : (long)(minPerRack + 1)), (long)maxPerRack);
    }

    @Test
    public void testRackUnAwareTopicCreation() throws Exception {
        int brokerCount = 6;
        this.setUp(brokerCount, Collections.emptyList());
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", "3");
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String topicName = "test_topic";
        short replicationFactor = 3;
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 6, replicationFactor)));
        createTopicsResult.all().get();
        HashMap<Integer, Integer> brokerToReplicaMap = new HashMap<Integer, Integer>(brokerCount);
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.all().get();
        Assert.assertTrue((String)topicDescriptions.toString(), (boolean)topicDescriptions.containsKey(topicName));
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
            for (Node node : topicPartitionInfo.replicas()) {
                int currentCount = brokerToReplicaMap.getOrDefault(node.id(), 0);
                brokerToReplicaMap.put(node.id(), currentCount + 1);
            }
        }
        for (Map.Entry entry : brokerToReplicaMap.entrySet()) {
            int numReplicas = (Integer)entry.getValue();
            Assert.assertEquals((String)("Broker " + entry.getKey() + " has " + numReplicas + " replicas. Topic description: " + topicDescription), (long)3L, (long)numReplicas);
        }
    }

    @Test
    public void testInvalidTopicCreationWithAutoTopicCreation() throws Throwable {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        this.createPhysicalAndLogicalClusters(brokerProps);
        HashSet<AclBinding> acls = new HashSet<AclBinding>();
        acls.add(new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.CREATE, AclPermissionType.ALLOW)));
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        tenantAdminClient.createAcls(acls).all().get();
        Assert.assertEquals(acls, this.describeAllAcls(tenantAdminClient));
        try (KafkaProducer<String, String> producer = this.testHarness.createProducer(this.logicalCluster1.user(11), SecurityProtocol.SASL_PLAINTEXT);){
            Assert.assertThrows(AuthorizationException.class, () -> KafkaTestUtils.sendRecords(producer, ".", 0, 10));
        }
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigDisabled() throws Exception {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        this.expectAlterBrokerConfigsViaExternalListenerRejected(tenantAdminClient, internalAdminClient, new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigDisabled() throws Exception {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        this.expectAlterBrokerConfigsViaExternalListenerRejected(tenantAdminClient, internalAdminClient, new ConfigResource(ConfigResource.Type.BROKER, ""));
    }

    private void expectAlterBrokerConfigsViaExternalListenerRejected(AdminClient tenantAdminClient, AdminClient internalAdminClient, ConfigResource configResource) throws Exception {
        KafkaServer broker0 = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        List defaultCipherSuites = broker0.config().getList("ssl.cipher.suites");
        int defaultMaxMessageBytes = broker0.config().messageMaxBytes();
        Map<ConfigResource, Config> newConfigs = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "10000"), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))));
        Throwable exceptionCause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.alterConfigs(newConfigs).all().get();
        })).getCause();
        Assert.assertEquals(PolicyViolationException.class, exceptionCause.getClass());
        Assert.assertEquals((long)defaultMaxMessageBytes, (long)broker0.config().messageMaxBytes().intValue());
        Assert.assertEquals((Object)false, (Object)broker0.config().autoCreateTopicsEnable());
        Assert.assertEquals((Object)defaultCipherSuites, (Object)broker0.config().get("ssl.cipher.suites"));
        Map<ConfigResource, List<AlterConfigOp>> newConfigs2 = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)));
        exceptionCause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.incrementalAlterConfigs(newConfigs2).all().get();
        })).getCause();
        Assert.assertEquals(PolicyViolationException.class, exceptionCause.getClass());
        Assert.assertEquals((long)defaultMaxMessageBytes, (long)broker0.config().messageMaxBytes().intValue());
        Assert.assertEquals((Object)false, (Object)broker0.config().autoCreateTopicsEnable());
        Assert.assertEquals((Object)defaultCipherSuites, (Object)broker0.config().get("ssl.cipher.suites"));
        Map<ConfigResource, Config> newConfigsInternal = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "10000"), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))));
        internalAdminClient.alterConfigs(newConfigsInternal).all().get();
        TestUtils.waitForCondition(() -> broker0.config().messageMaxBytes() == 10000, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable(), (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256".equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config not updated");
        Map<ConfigResource, List<AlterConfigOp>> newConfigs2Internal = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)));
        internalAdminClient.incrementalAlterConfigs(newConfigs2Internal).all().get();
        TestUtils.waitForCondition(() -> broker0.config().messageMaxBytes() == 15000, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable(), (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384".equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config not updated");
        Map<ConfigResource, Config> invalidConfigs = Collections.singletonMap(configResource, new Config(Collections.singleton(new ConfigEntry(KafkaConfig.BrokerIdProp(), "20"))));
        exceptionCause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)internalAdminClient.alterConfigs(invalidConfigs).all().get();
        })).getCause();
        Assert.assertEquals(InvalidRequestException.class, exceptionCause.getClass());
        Assert.assertEquals((long)0L, (long)broker0.config().brokerId());
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigEnabled() throws Exception {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put("confluent.alter.cluster.configs.enable", "true");
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        this.expectAlterBrokerConfigsViaExternalListenerRejected(tenantAdminClient, internalAdminClient, new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigEnabled() throws Exception {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put("confluent.alter.cluster.configs.enable", "true");
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
        String sslCipherSuitesConfig = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256";
        KafkaServer broker0 = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        long retentionMs0 = 3600000L;
        long maxCompactionLagMs0 = 0x7FFFFFFEL;
        Map<ConfigResource, Config> newConfigs = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(retentionMs0)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), sslCipherSuitesConfig), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(maxCompactionLagMs0)))));
        tenantAdminClient.alterConfigs(newConfigs).all().get();
        TestUtils.waitForCondition(() -> broker0.config().logRetentionTimeMillis() == retentionMs0, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable(), (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> sslCipherSuitesConfig.equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config for ssl-cipher-suites not updated.");
        String sslCipherSuitesConfig2 = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
        long retentionMs1 = 3600001L;
        int numPartitions = 2;
        long maxCompactionLagMs1 = Integer.MAX_VALUE;
        Map<ConfigResource, List<AlterConfigOp>> newConfigs2 = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(retentionMs1)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.NumPartitionsProp(), String.valueOf(numPartitions)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), sslCipherSuitesConfig2), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(maxCompactionLagMs1)), AlterConfigOp.OpType.SET)));
        tenantAdminClient.incrementalAlterConfigs(newConfigs2).all().get();
        TestUtils.waitForCondition(() -> broker0.config().logRetentionTimeMillis() == retentionMs1, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().numPartitions() == numPartitions, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable() == false, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> sslCipherSuitesConfig2.equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config for ssl-cipher-suites not updated.");
        Map<ConfigResource, Config> newConfigsInternal = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(retentionMs0)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(maxCompactionLagMs0)))));
        internalAdminClient.alterConfigs(newConfigsInternal).all().get();
        TestUtils.waitForCondition(() -> broker0.config().logRetentionTimeMillis() == retentionMs0, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable(), (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> sslCipherSuitesConfig.equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config for ssl-cipher-suites not updated.");
        Map<ConfigResource, List<AlterConfigOp>> newConfigs2Internal = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(retentionMs1)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(maxCompactionLagMs1)), AlterConfigOp.OpType.SET)));
        internalAdminClient.incrementalAlterConfigs(newConfigs2Internal).all().get();
        TestUtils.waitForCondition(() -> broker0.config().logRetentionTimeMillis() == retentionMs1, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> broker0.config().autoCreateTopicsEnable() == false, (String)"Dynamic config not updated");
        TestUtils.waitForCondition(() -> sslCipherSuitesConfig2.equals(this.sslCipherSuitesFromConfig(broker0.config(), this.externalListenerName)), (String)"Dynamic config for ssl-cipher-suites not updated.");
        long retentionMs2 = 2599999L;
        Map<ConfigResource, List<AlterConfigOp>> newConfigs3 = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(retentionMs2)), AlterConfigOp.OpType.SET)));
        Throwable exceptionCause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.incrementalAlterConfigs(newConfigs3).all().get();
        })).getCause();
        Assert.assertEquals(PolicyViolationException.class, exceptionCause.getClass());
        Assert.assertEquals((long)retentionMs1, (long)broker0.config().logRetentionTimeMillis());
        int defaultMaxMessageBytes = broker0.config().messageMaxBytes();
        Map<ConfigResource, List<AlterConfigOp>> newConfigs4 = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "500000"), AlterConfigOp.OpType.SET)));
        exceptionCause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.incrementalAlterConfigs(newConfigs4).all().get();
        })).getCause();
        Assert.assertEquals(PolicyViolationException.class, exceptionCause.getClass());
        Assert.assertEquals((long)defaultMaxMessageBytes, (long)broker0.config().messageMaxBytes().intValue());
    }

    private String sslCipherSuitesFromConfig(KafkaConfig kafkaConfig, ListenerName listenerName) {
        return (String)kafkaConfig.originals().get(listenerName.configPrefix() + KafkaConfig.SslCipherSuitesProp());
    }

    @Test
    public void testAcls() throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient adminClient1 = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient adminClient2 = this.testHarness.createAdminClient(this.logicalCluster2.adminUser());
        Assert.assertEquals(Collections.emptySet(), this.describeAllAcls(adminClient1));
        List<ResourceType> resourceTypes = Arrays.asList(ResourceType.TOPIC, ResourceType.GROUP, ResourceType.TRANSACTIONAL_ID);
        HashSet acls = new HashSet();
        resourceTypes.forEach(resourceType -> acls.add(new AclBinding(new ResourcePattern(resourceType, "test.resource", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
        resourceTypes.forEach(resourceType -> acls.add(new AclBinding(new ResourcePattern(resourceType, "test.", PatternType.PREFIXED), new AccessControlEntry(this.logicalCluster1.user(12).unprefixedKafkaPrincipal().toString(), "*", AclOperation.READ, AclPermissionType.ALLOW))));
        resourceTypes.forEach(resourceType -> acls.add(new AclBinding(new ResourcePattern(resourceType, "*", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))));
        adminClient1.createAcls(acls).all().get();
        Assert.assertEquals(acls, this.describeAllAcls(adminClient1));
        Assert.assertEquals(Collections.emptySet(), this.describeAllAcls(adminClient2));
        adminClient2.createAcls(acls).all().get();
        Assert.assertEquals(acls, this.describeAllAcls(adminClient2));
        adminClient2.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, "test", PatternType.PREFIXED), new AccessControlEntryFilter("User:*", "*", AclOperation.ANY, AclPermissionType.ANY)))).all().get();
    }

    private Set<AclBinding> describeAllAcls(AdminClient adminClient) throws Exception {
        Collection acls = (Collection)adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
        return new HashSet<AclBinding>(acls);
    }

    @Test
    public void testCreateTopicPolicyMaxPartitionPerTenantIsDynamicallyUpdated() throws Exception {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test", 513, 1)));
        TestUtils.assertFutureThrows((Future)createTopicsResult.all(), PolicyViolationException.class, (String)String.format("You may not create more than the maximum number of partitions (%d).", 512));
        internalAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.topic.policy.max.partitions.per.tenant", String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test", 513, 1))).all().get();
        });
    }
}

