package org.apache.kafka.connect.mirror.integration;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.class */
public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
    private static final int TOPIC_ACL_SYNC_DURATION_MS = 30000;
    private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 60000;

    protected static void enableAclAuthorizer(Properties properties) {
        properties.put("listener.security.protocol.map", "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        properties.put("authorizer.class.name", "org.apache.kafka.metadata.authorizer.StandardAuthorizer");
        properties.put("sasl.enabled.mechanisms", "PLAIN");
        properties.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
        properties.put("sasl.mechanism.controller.protocol", "PLAIN");
        properties.put("listener.name.external.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\" user_connector=\"connector_pwd\" user_super=\"super_pwd\";");
        properties.put("listener.name.controller.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\" user_connector=\"connector_pwd\" user_super=\"super_pwd\";");
        properties.put("super.users", "User:super");
    }

    protected static Map<String, String> superUserConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("sasl.mechanism", "PLAIN");
        hashMap.put("security.protocol", "SASL_PLAINTEXT");
        hashMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\";");
        return hashMap;
    }

    protected static Map<String, String> connectorUserConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("sasl.mechanism", "PLAIN");
        hashMap.put("security.protocol", "SASL_PLAINTEXT");
        hashMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"connector\" password=\"connector_pwd\";");
        return hashMap;
    }

    private static void deleteAllACLs(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        try {
            createAdminClient.deleteAcls((List) ((Set) createAdminClient.listTopics().names().get()).stream().map(str -> {
                return new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, str, PatternType.ANY), AccessControlEntryFilter.ANY);
            }).collect(Collectors.toList()));
            if (createAdminClient != null) {
                createAdminClient.close();
            }
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected static Collection<AclBinding> getAclBindings(EmbeddedKafkaCluster embeddedKafkaCluster, String str) throws Exception {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        try {
            Collection<AclBinding> collection = (Collection) createAdminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, str, PatternType.ANY), AccessControlEntryFilter.ANY)).values().get();
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            return collection;
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @BeforeEach
    public void startClusters() throws Exception {
        enableAclAuthorizer(this.primaryBrokerProps);
        this.additionalPrimaryClusterClientsConfigs.putAll(superUserConfig());
        this.primaryWorkerProps.putAll(superUserConfig());
        enableAclAuthorizer(this.backupBrokerProps);
        this.additionalBackupClusterClientsConfigs.putAll(superUserConfig());
        this.backupWorkerProps.putAll(superUserConfig());
        HashMap<String, String> hashMap = new HashMap<String, String>(superUserConfig()) { // from class: org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.1
            {
                put("forwarding.admin.class", FakeForwardingAdminWithLocalMetadata.class.getName());
            }
        };
        superUserConfig().forEach((str, str2) -> {
            hashMap.put("consumer.override." + str, str2);
            hashMap.put("producer.override." + str, str2);
            hashMap.put("consumer." + str, str2);
            hashMap.put("producer." + str, str2);
        });
        connectorUserConfig().forEach((str3, str4) -> {
            hashMap.put("admin." + str3, str4);
            hashMap.put("admin.override." + str3, str4);
        });
        startClusters(hashMap);
        Admin createAdminClient = this.primary.kafka().createAdminClient();
        try {
            createAdminClient.createAcls(Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            createAdminClient = this.backup.kafka().createAdminClient();
            try {
                createAdminClient.createAcls(Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @AfterEach
    public void shutdownClusters() throws Exception {
        deleteAllACLs(this.primary.kafka());
        deleteAllACLs(this.backup.kafka());
        FakeLocalMetadataStore.clear();
        super.shutdownClusters();
    }

    @Test
    public void testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() throws Exception {
        produceMessages(this.primaryProducer, "test-topic-1");
        produceMessages(this.backupProducer, "test-topic-1");
        warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        waitForTopicCreated(this.primary, "backup.test-topic-1");
        waitForTopicCreated(this.backup, "primary.test-topic-1");
        waitForTopicCreated(this.primary, "mm2-offset-syncs.backup.internal");
        waitForTopicCreated(this.primary, "backup.checkpoints.internal");
        waitForTopicCreated(this.primary, "backup.heartbeats");
        waitForTopicCreated(this.backup, "mm2-offset-syncs.primary.internal");
        waitForTopicCreated(this.backup, "primary.checkpoints.internal");
        waitForTopicCreated(this.backup, "primary.heartbeats");
        waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        waitForTopicToPersistInFakeLocalMetadataStore("mm2-offset-syncs.backup.internal");
        waitForTopicToPersistInFakeLocalMetadataStore("backup.checkpoints.internal");
        waitForTopicToPersistInFakeLocalMetadataStore("backup.heartbeats");
        waitForTopicToPersistInFakeLocalMetadataStore("mm2-offset-syncs.primary.internal");
        waitForTopicToPersistInFakeLocalMetadataStore("primary.checkpoints.internal");
        waitForTopicToPersistInFakeLocalMetadataStore("primary.heartbeats");
        waitForTopicToPersistInFakeLocalMetadataStore("heartbeats");
    }

    @Test
    public void testCreatePartitionsUseProvidedForwardingAdmin() throws Exception {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        produceMessages(this.backupProducer, "test-topic-1");
        produceMessages(this.primaryProducer, "test-topic-1");
        warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        waitForTopicCreated(this.primary, "backup.test-topic-1");
        waitForTopicCreated(this.backup, "primary.test-topic-1");
        waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        Map singletonMap = Collections.singletonMap("test-topic-1", NewPartitions.increaseTo(11));
        Admin createAdminClient = this.primary.kafka().createAdminClient();
        try {
            createAdminClient.createPartitions(singletonMap).all().get();
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            waitForTopicPartitionCreated(this.backup, "primary.test-topic-1", 11);
            waitForTopicConfigPersistInFakeLocalMetaDataStore("primary.test-topic-1", "partitions", String.valueOf(11));
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSyncTopicConfigUseProvidedForwardingAdmin() throws Exception {
        this.mm2Props.put("sync.topic.configs.enabled", "true");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        produceMessages(this.backupProducer, "test-topic-1");
        produceMessages(this.primaryProducer, "test-topic-1");
        warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        waitForTopicCreated(this.primary, "backup.test-topic-1");
        waitForTopicCreated(this.backup, "primary.test-topic-1");
        Assertions.assertEquals("compact", getTopicConfig(this.backup.kafka(), "primary.test-topic-1", "cleanup.policy"), "topic config was synced");
        waitForTopicToPersistInFakeLocalMetadataStore("backup.test-topic-1");
        waitForTopicToPersistInFakeLocalMetadataStore("primary.test-topic-1");
        waitForTopicConfigPersistInFakeLocalMetaDataStore("primary.test-topic-1", "cleanup.policy", "compact");
    }

    @Test
    public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
        this.mm2Props.put("sync.topic.acls.enabled", "true");
        this.mm2Props.put("sync.topic.acls.interval.seconds", "1");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        List singletonList = Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
        Admin createAdminClient = this.primary.kafka().createAdminClient();
        try {
            createAdminClient.createAcls(singletonList).all().get();
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            createAdminClient = this.backup.kafka().createAdminClient();
            try {
                createAdminClient.createAcls(singletonList).all().get();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                waitUntilMirrorMakerIsRunning(this.primary, CONNECTOR_LIST, this.mm2Config, "backup", "primary");
                waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
                waitForTopicCreated(this.primary, "backup.test-topic-1");
                waitForTopicCreated(this.backup, "primary.test-topic-1");
                AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "primary.test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
                AclBinding aclBinding2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "backup.test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
                TestUtils.waitForCondition(() -> {
                    Assertions.assertTrue(getAclBindings(this.backup.kafka(), "primary.test-topic-1").contains(aclBinding), "topic ACLs are not synced on backup cluster");
                    Assertions.assertTrue(getAclBindings(this.primary.kafka(), "backup.test-topic-1").contains(aclBinding2), "topic ACLs are not synced on primary cluster");
                    return true;
                }, 30000L, "Topic ACLs were not synced in time");
                Assertions.assertTrue(FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(aclBinding, aclBinding2)));
            } finally {
            }
        } finally {
        }
    }

    void waitForTopicToPersistInFakeLocalMetadataStore(String str) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return FakeLocalMetadataStore.containsTopic(str).booleanValue();
        }, 60000L, "Topic: " + str + " didn't get created in the FakeLocalMetadataStore");
    }

    void waitForTopicConfigPersistInFakeLocalMetaDataStore(String str, String str2, String str3) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return FakeLocalMetadataStore.topicConfig(str).getOrDefault(str2, "").equals(str3);
        }, 60000L, "Topic: " + str + "'s configs don't have " + str2 + ":" + str3);
    }
}
