package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.class */
public class MirrorConnectorsIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
    private static final int NUM_RECORDS_PRODUCED = 100;
    private static final int NUM_PARTITIONS = 10;
    private static final int RECORD_TRANSFER_DURATION_MS = 10000;
    private static final int CHECKPOINT_DURATION_MS = 20000;
    private MirrorMakerConfig mm2Config;
    private EmbeddedConnectCluster primary;
    private EmbeddedConnectCluster backup;

    @Before
    public void setup() throws InterruptedException {
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        HashMap hashMap = new HashMap();
        hashMap.put("clusters", "primary, backup");
        hashMap.put("max.tasks", "10");
        hashMap.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
        hashMap.put("groups", "consumer-group-.*");
        hashMap.put("primary->backup.enabled", "true");
        hashMap.put("backup->primary.enabled", "true");
        hashMap.put("sync.topic.acls.enabled", "false");
        hashMap.put("emit.checkpoints.interval.seconds", "1");
        hashMap.put("emit.heartbeats.interval.seconds", "1");
        hashMap.put("refresh.topics.interval.seconds", "1");
        hashMap.put("refresh.groups.interval.seconds", "1");
        hashMap.put("checkpoints.topic.replication.factor", "1");
        hashMap.put("heartbeats.topic.replication.factor", "1");
        hashMap.put("offset-syncs.topic.replication.factor", "1");
        hashMap.put("config.storage.topic.replication.factor", "1");
        hashMap.put("offset.stoage.topic.replication.factor", "1");
        hashMap.put("status.stoage.topic.replication.factor", "1");
        hashMap.put("replication.factor", "1");
        this.mm2Config = new MirrorMakerConfig(hashMap);
        Map workerConfig = this.mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
        Map workerConfig2 = this.mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
        this.primary = new EmbeddedConnectCluster.Builder().name("primary-connect-cluster").numWorkers(3).numBrokers(1).brokerProps(properties).workerProps(workerConfig).build();
        this.backup = new EmbeddedConnectCluster.Builder().name("backup-connect-cluster").numWorkers(3).numBrokers(1).brokerProps(properties).workerProps(workerConfig2).build();
        this.primary.start();
        this.primary.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of primary-connect-cluster did not start in time.");
        this.backup.start();
        this.primary.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of backup-connect-cluster did not start in time.");
        this.primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
        this.primary.kafka().createTopic("backup.test-topic-1", 1);
        this.primary.kafka().createTopic("heartbeats", 1);
        this.backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
        this.backup.kafka().createTopic("primary.test-topic-1", 1);
        this.backup.kafka().createTopic("heartbeats", 1);
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.primary.kafka().produce("test-topic-1", Integer.valueOf(i % NUM_PARTITIONS), "key", "message-1-" + i);
            this.backup.kafka().produce("test-topic-1", Integer.valueOf(i % NUM_PARTITIONS), "key", "message-2-" + i);
        }
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-1"), new String[]{"test-topic-1", "backup.test-topic-1"});
        createConsumerAndSubscribeTo.poll(Duration.ofMillis(500L));
        createConsumerAndSubscribeTo.commitSync();
        createConsumerAndSubscribeTo.close();
        KafkaConsumer createConsumerAndSubscribeTo2 = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-1"), new String[]{"test-topic-1", "primary.test-topic-1"});
        createConsumerAndSubscribeTo2.poll(Duration.ofMillis(500L));
        createConsumerAndSubscribeTo2.commitSync();
        createConsumerAndSubscribeTo2.close();
        log.info("primary REST service: {}", this.primary.endpointForResource("connectors"));
        log.info("backup REST service: {}", this.backup.endpointForResource("connectors"));
        log.info("primary brokers: {}", this.primary.kafka().bootstrapServers());
        log.info("backup brokers: {}", this.backup.kafka().bootstrapServers());
        hashMap.put("primary.bootstrap.servers", this.primary.kafka().bootstrapServers());
        hashMap.put("backup.bootstrap.servers", this.backup.kafka().bootstrapServers());
        this.mm2Config = new MirrorMakerConfig(hashMap);
        HashSet hashSet = new HashSet(Arrays.asList("MirrorSourceConnector", "MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
        this.backup.configureConnector("MirrorSourceConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"), MirrorSourceConnector.class));
        this.backup.configureConnector("MirrorCheckpointConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"), MirrorCheckpointConnector.class));
        this.backup.configureConnector("MirrorHeartbeatConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"), MirrorHeartbeatConnector.class));
        waitUntilMirrorMakerIsRunning(this.backup, hashSet);
        this.primary.configureConnector("MirrorSourceConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"), MirrorSourceConnector.class));
        this.primary.configureConnector("MirrorCheckpointConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"), MirrorCheckpointConnector.class));
        this.primary.configureConnector("MirrorHeartbeatConnector", this.mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"), MirrorHeartbeatConnector.class));
        waitUntilMirrorMakerIsRunning(this.primary, hashSet);
    }

    private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster embeddedConnectCluster, Set<String> set) throws InterruptedException {
        for (String str : set) {
            embeddedConnectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(str, 1, "Connector " + str + " tasks did not start in time on cluster: " + embeddedConnectCluster);
        }
    }

    @After
    public void close() {
        Iterator it = this.primary.connectors().iterator();
        while (it.hasNext()) {
            this.primary.deleteConnector((String) it.next());
        }
        Iterator it2 = this.backup.connectors().iterator();
        while (it2.hasNext()) {
            this.backup.deleteConnector((String) it2.next());
        }
        deleteAllTopics(this.primary.kafka());
        deleteAllTopics(this.backup.kafka());
        this.primary.stop();
        this.backup.stop();
    }

    @Test
    public void testReplication() throws InterruptedException {
        MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig("primary"));
        MirrorClient mirrorClient2 = new MirrorClient(this.mm2Config.clientConfig("backup"));
        Assert.assertEquals("Records were not produced to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"test-topic-1"}).count());
        Assert.assertEquals("Records were not replicated to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"primary.test-topic-1"}).count());
        Assert.assertEquals("Records were not produced to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"test-topic-1"}).count());
        Assert.assertEquals("Records were not replicated to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"backup.test-topic-1"}).count());
        Assert.assertEquals("Primary cluster doesn't have all records from both clusters.", 200L, this.primary.kafka().consume(200, 10000L, new String[]{"backup.test-topic-1", "test-topic-1"}).count());
        Assert.assertEquals("Backup cluster doesn't have all records from both clusters.", 200L, this.backup.kafka().consume(200, 10000L, new String[]{"primary.test-topic-1", "test-topic-1"}).count());
        Assert.assertTrue("Heartbeats were not emitted to primary cluster.", this.primary.kafka().consume(1, 10000L, new String[]{"heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not emitted to backup cluster.", this.backup.kafka().consume(1, 10000L, new String[]{"heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not replicated downstream to backup cluster.", this.backup.kafka().consume(1, 10000L, new String[]{"primary.heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not replicated downstream to primary cluster.", this.primary.kafka().consume(1, 10000L, new String[]{"backup.heartbeats"}).count() > 0);
        Assert.assertTrue("Did not find upstream primary cluster.", mirrorClient2.upstreamClusters().contains("primary"));
        Assert.assertEquals("Did not calculate replication hops correctly.", 1L, mirrorClient2.replicationHops("primary"));
        Assert.assertTrue("Did not find upstream backup cluster.", mirrorClient.upstreamClusters().contains("backup"));
        Assert.assertEquals("Did not calculate replication hops correctly.", 1L, mirrorClient.replicationHops("backup"));
        Assert.assertTrue("Checkpoints were not emitted downstream to backup cluster.", this.backup.kafka().consume(1, 20000L, new String[]{"primary.checkpoints.internal"}).count() > 0);
        Map remoteConsumerOffsets = mirrorClient2.remoteConsumerOffsets("consumer-group-1", "primary", Duration.ofMillis(20000L));
        Assert.assertTrue("Offsets not translated downstream to backup cluster. Found: " + remoteConsumerOffsets, remoteConsumerOffsets.containsKey(new TopicPartition("primary.test-topic-1", 0)));
        KafkaConsumer createConsumer = this.backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
        createConsumer.assign(remoteConsumerOffsets.keySet());
        createConsumer.getClass();
        remoteConsumerOffsets.forEach(createConsumer::seek);
        createConsumer.poll(Duration.ofMillis(500L));
        createConsumer.commitSync();
        Assert.assertTrue("Consumer failedover to zero offset.", createConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedover beyond expected offset.", createConsumer.position(new TopicPartition("primary.test-topic-1", 0)) <= 100);
        Assert.assertTrue("Checkpoints were not emitted upstream to primary cluster.", this.primary.kafka().consume(1, 20000L, new String[]{"backup.checkpoints.internal"}).count() > 0);
        createConsumer.close();
        TestUtils.waitForCondition(() -> {
            try {
                return mirrorClient.remoteConsumerOffsets("consumer-group-1", "backup", Duration.ofMillis(20000L)).containsKey(new TopicPartition("backup.test-topic-1", 0));
            } catch (Throwable th) {
                return false;
            }
        }, 20000L, "Offsets not translated downstream to primary cluster.");
        TestUtils.waitForCondition(() -> {
            try {
                return mirrorClient.remoteConsumerOffsets("consumer-group-1", "backup", Duration.ofMillis(20000L)).containsKey(new TopicPartition("test-topic-1", 0));
            } catch (Throwable th) {
                return false;
            }
        }, 20000L, "Offsets not translated upstream to primary cluster.");
        Map remoteConsumerOffsets2 = mirrorClient.remoteConsumerOffsets("consumer-group-1", "backup", Duration.ofMillis(20000L));
        KafkaConsumer createConsumer2 = this.primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
        createConsumer2.assign(remoteConsumerOffsets2.keySet());
        createConsumer2.getClass();
        remoteConsumerOffsets2.forEach(createConsumer2::seek);
        createConsumer2.poll(Duration.ofMillis(500L));
        Assert.assertTrue("Consumer failedback to zero upstream offset.", createConsumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedback to zero downstream offset.", createConsumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedback beyond expected upstream offset.", createConsumer2.position(new TopicPartition("test-topic-1", 0)) <= 100);
        Assert.assertTrue("Consumer failedback beyond expected downstream offset.", createConsumer2.position(new TopicPartition("backup.test-topic-1", 0)) <= 100);
        createConsumer2.close();
        this.primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
        this.backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
            this.backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
        }
        Assert.assertEquals("Records were not produced to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"test-topic-2"}).count());
        Assert.assertEquals("Records were not produced to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 10000L, new String[]{"test-topic-3"}).count());
        Assert.assertEquals("New topic was not replicated to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 20000L, new String[]{"backup.test-topic-3"}).count());
        Assert.assertEquals("New topic was not replicated to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 20000L, new String[]{"primary.test-topic-2"}).count());
    }

    private void deleteAllTopics(EmbeddedKafkaCluster embeddedKafkaCluster) {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        try {
            createAdminClient.deleteTopics((Collection) createAdminClient.listTopics().names().get());
        } catch (Throwable th) {
        }
    }
}
