package org.apache.kafka.connect.mirror.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.class */
public class MirrorConnectorsIntegrationBaseTest {
    protected static final int NUM_RECORDS_PER_PARTITION = 10;
    protected static final int NUM_PARTITIONS = 10;
    protected static final int NUM_RECORDS_PRODUCED = 100;
    protected static final int OFFSET_LAG_MAX = 10;
    protected static final int RECORD_TRANSFER_DURATION_MS = 30000;
    protected static final int CHECKPOINT_DURATION_MS = 20000;
    private static final int RECORD_CONSUME_DURATION_MS = 20000;
    private static final int OFFSET_SYNC_DURATION_MS = 30000;
    private static final int TOPIC_SYNC_DURATION_MS = 60000;
    private static final int REQUEST_TIMEOUT_DURATION_MS = 60000;
    private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1000;
    private static final int NUM_WORKERS = 3;
    protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
    protected static final String BACKUP_CLUSTER_ALIAS = "backup";
    private volatile boolean shuttingDown;
    protected MirrorMakerConfig mm2Config;
    protected EmbeddedConnectCluster primary;
    protected EmbeddedConnectCluster backup;
    protected Exit.Procedure exitProcedure;
    private Exit.Procedure haltProcedure;
    private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
    protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500);
    protected static final List<Class<? extends Connector>> CONNECTOR_LIST = Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
    protected Map<String, String> mm2Props = new HashMap();
    protected Map<String, String> additionalPrimaryClusterClientsConfigs = new HashMap();
    protected Map<String, String> additionalBackupClusterClientsConfigs = new HashMap();
    protected boolean replicateBackupToPrimary = true;
    protected Boolean createReplicatedTopicsUpfront = false;
    protected Properties primaryBrokerProps = new Properties();
    protected Properties backupBrokerProps = new Properties();
    protected Map<String, String> primaryWorkerProps = new HashMap();
    protected Map<String, String> backupWorkerProps = new HashMap();

    @BeforeEach
    public void startClusters() throws Exception {
        startClusters(new HashMap<String, String>() { // from class: org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.1
            {
                put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
            }
        });
    }

    public void startClusters(Map<String, String> map) throws Exception {
        this.shuttingDown = false;
        this.exitProcedure = (i, str) -> {
            if (this.shuttingDown || i == 0) {
                return;
            }
            String str = "Abrupt service exit with code " + i + " and message " + str;
            log.warn(str);
            throw new UngracefulShutdownException(str);
        };
        this.haltProcedure = (i2, str2) -> {
            if (this.shuttingDown || i2 == 0) {
                return;
            }
            String str2 = "Abrupt service halt with code " + i2 + " and message " + str2;
            log.warn(str2);
            throw new UngracefulShutdownException(str2);
        };
        Exit.setExitProcedure(this.exitProcedure);
        Exit.setHaltProcedure(this.haltProcedure);
        this.primaryBrokerProps.put("auto.create.topics.enable", "false");
        this.backupBrokerProps.put("auto.create.topics.enable", "false");
        this.mm2Props.putAll(basicMM2Config());
        this.mm2Props.put("primary->backup.enabled", "true");
        this.mm2Props.put("backup->primary.enabled", Boolean.toString(this.replicateBackupToPrimary));
        this.mm2Props.putAll(map);
        this.mm2Props.put("config.properties.exclude", "delete\\.retention\\..*");
        this.mm2Props.put("sync.topic.configs.interval.seconds", "1");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        this.primaryWorkerProps = this.mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
        this.backupWorkerProps.putAll(this.mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
        this.primary = new EmbeddedConnectCluster.Builder().name("primary-connect-cluster").numWorkers(NUM_WORKERS).numBrokers(1).brokerProps(this.primaryBrokerProps).workerProps(this.primaryWorkerProps).maskExitProcedures(false).clientConfigs(this.additionalPrimaryClusterClientsConfigs).build();
        this.backup = new EmbeddedConnectCluster.Builder().name("backup-connect-cluster").numWorkers(NUM_WORKERS).numBrokers(1).brokerProps(this.backupBrokerProps).workerProps(this.backupWorkerProps).maskExitProcedures(false).clientConfigs(this.additionalBackupClusterClientsConfigs).build();
        this.primary.start();
        this.primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Workers of primary-connect-cluster did not start in time.");
        waitForTopicCreated(this.primary, "mm2-status.backup.internal");
        waitForTopicCreated(this.primary, "mm2-offsets.backup.internal");
        waitForTopicCreated(this.primary, "mm2-configs.backup.internal");
        this.backup.start();
        this.backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Workers of backup-connect-cluster did not start in time.");
        createTopics();
        waitForTopicCreated(this.backup, "mm2-status.primary.internal");
        waitForTopicCreated(this.backup, "mm2-offsets.primary.internal");
        waitForTopicCreated(this.backup, "mm2-configs.primary.internal");
        waitForTopicCreated(this.backup, "test-topic-1");
        waitForTopicCreated(this.primary, "test-topic-1");
        warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-dummy"));
        log.info("primary REST service: {}", this.primary.endpointForResource("connectors"));
        log.info("backup REST service: {}", this.backup.endpointForResource("connectors"));
        log.info("primary brokers: {}", this.primary.kafka().bootstrapServers());
        log.info("backup brokers: {}", this.backup.kafka().bootstrapServers());
        this.mm2Props.put("primary.bootstrap.servers", this.primary.kafka().bootstrapServers());
        this.mm2Props.put("backup.bootstrap.servers", this.backup.kafka().bootstrapServers());
    }

    @AfterEach
    public void shutdownClusters() throws Exception {
        try {
            Iterator it = this.primary.connectors().iterator();
            while (it.hasNext()) {
                this.primary.deleteConnector((String) it.next());
            }
            Iterator it2 = this.backup.connectors().iterator();
            while (it2.hasNext()) {
                this.backup.deleteConnector((String) it2.next());
            }
            deleteAllTopics(this.primary.kafka());
            deleteAllTopics(this.backup.kafka());
            this.shuttingDown = true;
            try {
                try {
                    this.primary.stop();
                    this.backup.stop();
                    Exit.resetExitProcedure();
                    Exit.resetHaltProcedure();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            this.shuttingDown = true;
            try {
                try {
                    this.primary.stop();
                    this.backup.stop();
                    Exit.resetExitProcedure();
                    Exit.resetHaltProcedure();
                    throw th;
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testReplication() throws Exception {
        produceMessages(this.primary, "test-topic-1");
        String remoteTopicName = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        if (this.replicateBackupToPrimary) {
            produceMessages(this.backup, "test-topic-1");
        }
        String remoteTopicName2 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
        warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        waitUntilMirrorMakerIsRunning(this.primary, this.replicateBackupToPrimary ? CONNECTOR_LIST : Collections.singletonList(MirrorHeartbeatConnector.class), this.mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
        MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
        MirrorClient mirrorClient2 = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
        waitForTopicCreated(this.primary, remoteTopicName2);
        waitForTopicCreated(this.backup, remoteTopicName);
        waitForTopicCreated(this.primary, "mm2-offset-syncs.backup.internal");
        Assertions.assertEquals("compact", getTopicConfig(this.backup.kafka(), remoteTopicName, "cleanup.policy"), "topic config was not synced");
        createAndTestNewTopicWithConfigFilter();
        Assertions.assertEquals(NUM_RECORDS_PRODUCED, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"test-topic-1"}).count(), "Records were not produced to primary cluster.");
        Assertions.assertEquals(NUM_RECORDS_PRODUCED, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{remoteTopicName}).count(), "Records were not replicated to backup cluster.");
        Assertions.assertEquals(NUM_RECORDS_PRODUCED, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"test-topic-1"}).count(), "Records were not produced to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertEquals(NUM_RECORDS_PRODUCED, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{remoteTopicName2}).count(), "Records were not replicated to primary cluster.");
            Assertions.assertEquals(200, this.primary.kafka().consume(200, 30000L, new String[]{remoteTopicName2, "test-topic-1"}).count(), "Primary cluster doesn't have all records from both clusters.");
            Assertions.assertEquals(200, this.backup.kafka().consume(200, 30000L, new String[]{remoteTopicName, "test-topic-1"}).count(), "Backup cluster doesn't have all records from both clusters.");
        }
        Assertions.assertTrue(this.primary.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0, "Heartbeats were not emitted to primary cluster.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0, "Heartbeats were not emitted to backup cluster.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 30000L, new String[]{"primary.heartbeats"}).count() > 0, "Heartbeats were not replicated downstream to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertTrue(this.primary.kafka().consume(1, 30000L, new String[]{"backup.heartbeats"}).count() > 0, "Heartbeats were not replicated downstream to primary cluster.");
        }
        Assertions.assertTrue(mirrorClient2.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
        Assertions.assertEquals(1, mirrorClient2.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 20000L, new String[]{"primary.checkpoints.internal"}).count() > 0, "Checkpoints were not emitted downstream to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertTrue(mirrorClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
            Assertions.assertEquals(1, mirrorClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
            Assertions.assertTrue(this.primary.kafka().consume(1, 20000L, new String[]{"backup.checkpoints.internal"}).count() > 0, "Checkpoints were not emitted upstream to primary cluster.");
        }
        Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions = waitForCheckpointOnAllPartitions(mirrorClient2, "consumer-group-testReplication", PRIMARY_CLUSTER_ALIAS, remoteTopicName);
        KafkaConsumer createConsumer = this.backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        Throwable th = null;
        try {
            createConsumer.assign(waitForCheckpointOnAllPartitions.keySet());
            createConsumer.getClass();
            waitForCheckpointOnAllPartitions.forEach(createConsumer::seek);
            createConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            createConsumer.commitAsync();
            Assertions.assertTrue(createConsumer.position(new TopicPartition(remoteTopicName, 0)) > 0, "Consumer failedover to zero offset.");
            Assertions.assertTrue(createConsumer.position(new TopicPartition(remoteTopicName, 0)) <= 100, "Consumer failedover beyond expected offset.");
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumer.close();
                }
            }
            assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
            mirrorClient.close();
            mirrorClient2.close();
            if (this.replicateBackupToPrimary) {
                Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions2 = waitForCheckpointOnAllPartitions(mirrorClient, "consumer-group-testReplication", BACKUP_CLUSTER_ALIAS, remoteTopicName2);
                KafkaConsumer createConsumer2 = this.primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
                Throwable th3 = null;
                try {
                    try {
                        createConsumer2.assign(waitForCheckpointOnAllPartitions2.keySet());
                        createConsumer2.getClass();
                        waitForCheckpointOnAllPartitions2.forEach(createConsumer2::seek);
                        createConsumer2.poll(CONSUMER_POLL_TIMEOUT_MS);
                        createConsumer2.commitAsync();
                        Assertions.assertTrue(createConsumer2.position(new TopicPartition(remoteTopicName2, 0)) > 0, "Consumer failedback to zero downstream offset.");
                        Assertions.assertTrue(createConsumer2.position(new TopicPartition(remoteTopicName2, 0)) <= 100, "Consumer failedback beyond expected downstream offset.");
                        if (createConsumer2 != null) {
                            if (0 != 0) {
                                try {
                                    createConsumer2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                createConsumer2.close();
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (createConsumer2 != null) {
                        if (th3 != null) {
                            try {
                                createConsumer2.close();
                            } catch (Throwable th7) {
                                th3.addSuppressed(th7);
                            }
                        } else {
                            createConsumer2.close();
                        }
                    }
                    throw th6;
                }
            }
            this.primary.kafka().createTopic("test-topic-2", 10);
            String remoteTopicName3 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
            waitForTopicCreated(this.backup, remoteTopicName3);
            produceMessages(this.primary, "test-topic-2", 1);
            Assertions.assertEquals(10, this.primary.kafka().consume(10, 30000L, new String[]{"test-topic-2"}).count(), "Records were not produced to primary cluster.");
            Assertions.assertEquals(10, this.backup.kafka().consume(10, 60000L, new String[]{remoteTopicName3}).count(), "New topic was not replicated to backup cluster.");
            if (this.replicateBackupToPrimary) {
                this.backup.kafka().createTopic("test-topic-3", 10);
                String remoteTopicName4 = remoteTopicName("test-topic-3", BACKUP_CLUSTER_ALIAS);
                waitForTopicCreated(this.primary, remoteTopicName4);
                produceMessages(this.backup, "test-topic-3", 1);
                Assertions.assertEquals(10, this.backup.kafka().consume(10, 30000L, new String[]{"test-topic-3"}).count(), "Records were not produced to backup cluster.");
                Assertions.assertEquals(10, this.primary.kafka().consume(10, 60000L, new String[]{remoteTopicName4}).count(), "New topic was not replicated to primary cluster.");
            }
        } catch (Throwable th8) {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testReplicationWithEmptyPartition() throws Exception {
        String remoteTopicName;
        Throwable th;
        Map singletonMap = Collections.singletonMap("group.id", "consumer-group-testReplicationWithEmptyPartition");
        this.primary.kafka().createTopic("test-topic-with-empty-partition", 10);
        produceMessages(this.primary, "test-topic-with-empty-partition", 9);
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-with-empty-partition"});
        Throwable th2 = null;
        try {
            try {
                waitForConsumingAllRecords(createConsumerAndSubscribeTo, 90);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                this.mm2Props.put("backup->primary.enabled", "false");
                this.mm2Config = new MirrorMakerConfig(this.mm2Props);
                waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
                Thread.sleep(TimeUnit.SECONDS.toMillis(3L));
                remoteTopicName = remoteTopicName("test-topic-with-empty-partition", PRIMARY_CLUSTER_ALIAS);
                createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{remoteTopicName});
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    waitForConsumingAllRecords(createConsumerAndSubscribeTo, 90);
                    if (createConsumerAndSubscribeTo != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createConsumerAndSubscribeTo.close();
                        }
                    }
                    Admin createAdminClient = this.backup.kafka().createAdminClient();
                    Throwable th6 = null;
                    try {
                        try {
                            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) ((Map) createAdminClient.listConsumerGroupOffsets("consumer-group-testReplicationWithEmptyPartition").partitionsToOffsetAndMetadata().get()).get(new TopicPartition(remoteTopicName, 9));
                            Assertions.assertNotNull(offsetAndMetadata, "Offset of last partition was not replicated");
                            Assertions.assertEquals(0L, offsetAndMetadata.offset(), "Offset of last partition is not zero");
                            if (createAdminClient != null) {
                                if (0 == 0) {
                                    createAdminClient.close();
                                    return;
                                }
                                try {
                                    createAdminClient.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            }
                        } catch (Throwable th8) {
                            th6 = th8;
                            throw th8;
                        }
                    } catch (Throwable th9) {
                        if (createAdminClient != null) {
                            if (th6 != null) {
                                try {
                                    createAdminClient.close();
                                } catch (Throwable th10) {
                                    th6.addSuppressed(th10);
                                }
                            } else {
                                createAdminClient.close();
                            }
                        }
                        throw th9;
                    }
                } catch (Throwable th11) {
                    th = th11;
                    throw th11;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
        testOneWayReplicationWithOffsetSyncs(10);
    }

    @Test
    public void testOneWayReplicationWithFrequentOffsetSyncs() throws InterruptedException {
        testOneWayReplicationWithOffsetSyncs(0);
    }

    private void testOneWayReplicationWithOffsetSyncs(int i) throws InterruptedException {
        produceMessages(this.primary, "test-topic-1");
        String remoteTopicName = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        final String str = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
        HashMap<String, Object> hashMap = new HashMap<String, Object>() { // from class: org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.2
            {
                put("group.id", str);
                put("auto.offset.reset", "earliest");
            }
        };
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-1"});
        Throwable th = null;
        try {
            try {
                waitForConsumingAllRecords(createConsumerAndSubscribeTo, NUM_RECORDS_PRODUCED);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                this.mm2Props.put("sync.group.offsets.enabled", "true");
                this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
                this.mm2Props.put("offset.lag.max", Integer.toString(i));
                this.mm2Props.put("backup->primary.enabled", "false");
                this.mm2Config = new MirrorMakerConfig(this.mm2Props);
                waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
                String remoteTopicName2 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
                if (!"test-topic-1".equals(remoteTopicName2)) {
                    topicShouldNotBeCreated(this.primary, remoteTopicName2);
                }
                waitForTopicCreated(this.backup, remoteTopicName);
                KafkaConsumer createConsumerAndSubscribeTo2 = this.backup.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{remoteTopicName});
                Throwable th3 = null;
                try {
                    waitForConsumerGroupFullSync(this.backup, Collections.singletonList(remoteTopicName), "consumer-group-testOneWayReplicationWithAutoOffsetSync", NUM_RECORDS_PRODUCED, i);
                    assertDownstreamRedeliveriesBoundedByMaxLag(createConsumerAndSubscribeTo2, i);
                    if (createConsumerAndSubscribeTo2 != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            createConsumerAndSubscribeTo2.close();
                        }
                    }
                    this.primary.kafka().createTopic("test-topic-2", 10);
                    String remoteTopicName3 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
                    waitForTopicCreated(this.backup, remoteTopicName3);
                    produceMessages(this.primary, "test-topic-2");
                    createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-testOneWayReplicationWithAutoOffsetSync"), new String[]{"test-topic-2"});
                    Throwable th5 = null;
                    try {
                        try {
                            waitForConsumingAllRecords(createConsumerAndSubscribeTo, NUM_RECORDS_PRODUCED);
                            if (createConsumerAndSubscribeTo != null) {
                                if (0 != 0) {
                                    try {
                                        createConsumerAndSubscribeTo.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    createConsumerAndSubscribeTo.close();
                                }
                            }
                            createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-testOneWayReplicationWithAutoOffsetSync"), new String[]{remoteTopicName, remoteTopicName3});
                            Throwable th7 = null;
                            try {
                                try {
                                    waitForConsumerGroupFullSync(this.backup, Arrays.asList(remoteTopicName, remoteTopicName3), "consumer-group-testOneWayReplicationWithAutoOffsetSync", NUM_RECORDS_PRODUCED, i);
                                    assertDownstreamRedeliveriesBoundedByMaxLag(createConsumerAndSubscribeTo, i);
                                    if (createConsumerAndSubscribeTo != null) {
                                        if (0 != 0) {
                                            try {
                                                createConsumerAndSubscribeTo.close();
                                            } catch (Throwable th8) {
                                                th7.addSuppressed(th8);
                                            }
                                        } else {
                                            createConsumerAndSubscribeTo.close();
                                        }
                                    }
                                    assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
                                } finally {
                                }
                            } finally {
                                if (createConsumerAndSubscribeTo != null) {
                                    if (th7 != null) {
                                        try {
                                            createConsumerAndSubscribeTo.close();
                                        } catch (Throwable th9) {
                                            th7.addSuppressed(th9);
                                        }
                                    } else {
                                        createConsumerAndSubscribeTo.close();
                                    }
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th10) {
                    if (createConsumerAndSubscribeTo2 != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo2.close();
                            } catch (Throwable th11) {
                                th3.addSuppressed(th11);
                            }
                        } else {
                            createConsumerAndSubscribeTo2.close();
                        }
                    }
                    throw th10;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testOffsetSyncsTopicsOnTarget() throws Exception {
        this.mm2Props.put("primary->backup.offset-syncs.topic.location", "target");
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        waitForTopicCreated(this.backup, "mm2-offset-syncs.primary.internal");
        Map<String, Object> singletonMap = Collections.singletonMap("group.id", "consumer-group-syncs-on-target");
        produceMessages(this.primary, "test-topic-1");
        warmUpConsumer(singletonMap);
        String remoteTopicName = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        KafkaConsumer createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("auto.offset.reset", "earliest"), new String[]{"primary.checkpoints.internal"});
        TestUtils.waitForCondition(() -> {
            Iterator it = createConsumerAndSubscribeTo.poll(Duration.ofSeconds(1L)).iterator();
            while (it.hasNext()) {
                if (remoteTopicName.equals(Checkpoint.deserializeRecord((ConsumerRecord) it.next()).topicPartition().topic())) {
                    return true;
                }
            }
            return false;
        }, 30000L, "Unable to find checkpoints for primary.test-topic-1");
        assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
        Set set = (Set) this.primary.kafka().createAdminClient().listTopics().names().get();
        Assertions.assertFalse(set.contains("mm2-offset-syncs.primary.internal"));
        Assertions.assertFalse(set.contains("mm2-offset-syncs.backup.internal"));
    }

    @Test
    public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedException {
        String str = "consumer-group-no-checkpoints";
        Map<String, Object> singletonMap = Collections.singletonMap("group.id", "consumer-group-no-checkpoints");
        produceMessages(this.primary, "test-topic-1");
        warmUpConsumer(singletonMap);
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        waitForTopicCreated(this.backup, remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS));
        waitForTopicCreated(this.backup, remoteTopicName("test-topic-no-checkpoints", PRIMARY_CLUSTER_ALIAS));
        TopicPartition topicPartition = new TopicPartition("test-topic-1", 0);
        TopicPartition topicPartition2 = new TopicPartition("test-topic-no-checkpoints", 0);
        KafkaConsumer createConsumer = this.primary.kafka().createConsumer(singletonMap);
        Throwable th = null;
        try {
            try {
                createConsumer.commitSync((Map) createConsumer.endOffsets(Arrays.asList(topicPartition, topicPartition2)).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return new OffsetAndMetadata(((Long) entry.getValue()).longValue());
                })));
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
                TestUtils.waitForCondition(() -> {
                    Map remoteConsumerOffsets = mirrorClient.remoteConsumerOffsets(str, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
                    return remoteConsumerOffsets.containsKey(remoteTopicPartition(topicPartition, PRIMARY_CLUSTER_ALIAS)) && !remoteConsumerOffsets.containsKey(remoteTopicPartition(topicPartition2, PRIMARY_CLUSTER_ALIAS));
                }, 30000L, "Checkpoints were not emitted correctly to backup cluster");
                produceMessages(this.primary, "test-topic-no-checkpoints");
                KafkaConsumer createConsumer2 = this.primary.kafka().createConsumer(singletonMap);
                Throwable th3 = null;
                try {
                    createConsumer2.commitSync((Map) createConsumer2.endOffsets(Arrays.asList(topicPartition, topicPartition2)).entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry2 -> {
                        return new OffsetAndMetadata(((Long) entry2.getValue()).longValue());
                    })));
                    if (createConsumer2 != null) {
                        if (0 != 0) {
                            try {
                                createConsumer2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            createConsumer2.close();
                        }
                    }
                    TestUtils.waitForCondition(() -> {
                        Map remoteConsumerOffsets = mirrorClient.remoteConsumerOffsets(str, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
                        return remoteConsumerOffsets.containsKey(remoteTopicPartition(topicPartition, PRIMARY_CLUSTER_ALIAS)) && remoteConsumerOffsets.containsKey(remoteTopicPartition(topicPartition2, PRIMARY_CLUSTER_ALIAS));
                    }, 30000L, "Checkpoints were not emitted correctly to backup cluster");
                } catch (Throwable th5) {
                    if (createConsumer2 != null) {
                        if (0 != 0) {
                            try {
                                createConsumer2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            createConsumer2.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th7;
        }
    }

    @Test
    public void testRestartReplication() throws InterruptedException {
        Throwable th;
        Map<String, Object> singletonMap = Collections.singletonMap("group.id", "consumer-group-restart");
        String remoteTopicName = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        warmUpConsumer(singletonMap);
        this.mm2Props.put("sync.group.offsets.enabled", "true");
        this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
        this.mm2Props.put("offset.lag.max", Integer.toString(10));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        produceMessages(this.primary, "test-topic-1");
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
        Throwable th2 = null;
        try {
            try {
                waitForConsumingAllRecords(createConsumerAndSubscribeTo, NUM_RECORDS_PRODUCED);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                waitForConsumerGroupFullSync(this.backup, Collections.singletonList(remoteTopicName), "consumer-group-restart", NUM_RECORDS_PRODUCED, 10);
                restartMirrorMakerConnectors(this.backup, CONNECTOR_LIST);
                assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
                Thread.sleep(5000L);
                produceMessages(this.primary, "test-topic-1");
                createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
                th = null;
            } finally {
            }
            try {
                try {
                    waitForConsumingAllRecords(createConsumerAndSubscribeTo, NUM_RECORDS_PRODUCED);
                    if (createConsumerAndSubscribeTo != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createConsumerAndSubscribeTo.close();
                        }
                    }
                    waitForConsumerGroupFullSync(this.backup, Collections.singletonList(remoteTopicName), "consumer-group-restart", 200, 10);
                    assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
        Map<String, Object> singletonMap = Collections.singletonMap("group.id", "consumer-group-lagging-behind");
        String remoteTopicName = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        warmUpConsumer(singletonMap);
        this.mm2Props.put("sync.group.offsets.enabled", "true");
        this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
        this.mm2Props.put("offset.lag.max", Integer.toString(10));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            produceMessages(this.primary, "test-topic-1");
        }
        waitForTopicCreated(this.backup, remoteTopicName);
        Assertions.assertEquals(NUM_RECORDS_PRODUCED * NUM_RECORDS_PRODUCED, this.backup.kafka().consume(NUM_RECORDS_PRODUCED * NUM_RECORDS_PRODUCED, 30000L, new String[]{remoteTopicName}).count(), "Records were not replicated to backup cluster.");
        ConsumerRecords<byte[], byte[]> consume = this.primary.kafka().consume(NUM_RECORDS_PRODUCED * NUM_RECORDS_PRODUCED, 20000L, new String[]{"test-topic-1"});
        MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
        Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions = waitForCheckpointOnAllPartitions(mirrorClient, "consumer-group-lagging-behind", PRIMARY_CLUSTER_ALIAS, remoteTopicName);
        log.info("Initial checkpoints: {}", waitForCheckpointOnAllPartitions);
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
        Throwable th = null;
        try {
            try {
                createConsumerAndSubscribeTo.poll(CONSUMER_POLL_TIMEOUT_MS);
                createConsumerAndSubscribeTo.commitSync(partialOffsets(consume, 0.8999999761581421d));
                Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions = waitForNewCheckpointOnAllPartitions(mirrorClient, "consumer-group-lagging-behind", PRIMARY_CLUSTER_ALIAS, remoteTopicName, waitForCheckpointOnAllPartitions);
                log.info("Partial checkpoints: {}", waitForNewCheckpointOnAllPartitions);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                for (TopicPartition topicPartition : waitForCheckpointOnAllPartitions.keySet()) {
                    Assertions.assertTrue(waitForCheckpointOnAllPartitions.get(topicPartition).offset() < waitForNewCheckpointOnAllPartitions.get(topicPartition).offset(), "Checkpoints should advance when the upstream consumer group advances");
                }
                assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
                createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
                Throwable th3 = null;
                try {
                    try {
                        createConsumerAndSubscribeTo.poll(CONSUMER_POLL_TIMEOUT_MS);
                        createConsumerAndSubscribeTo.commitSync(partialOffsets(consume, 0.10000000149011612d));
                        Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions2 = waitForNewCheckpointOnAllPartitions(mirrorClient, "consumer-group-lagging-behind", PRIMARY_CLUSTER_ALIAS, remoteTopicName, waitForNewCheckpointOnAllPartitions);
                        log.info("Final checkpoints: {}", waitForNewCheckpointOnAllPartitions2);
                        if (createConsumerAndSubscribeTo != null) {
                            if (0 != 0) {
                                try {
                                    createConsumerAndSubscribeTo.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                createConsumerAndSubscribeTo.close();
                            }
                        }
                        for (TopicPartition topicPartition2 : waitForNewCheckpointOnAllPartitions.keySet()) {
                            Assertions.assertTrue(waitForNewCheckpointOnAllPartitions2.get(topicPartition2).offset() < waitForNewCheckpointOnAllPartitions.get(topicPartition2).offset(), "Checkpoints should rewind when the upstream consumer group rewinds");
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> consumerRecords, double d) {
        return (Map) consumerRecords.partitions().stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return new OffsetAndMetadata(((ConsumerRecord) consumerRecords.records(topicPartition).get((int) (r0.size() * d))).offset());
        }));
    }

    @Test
    public void testSyncTopicConfigs() throws InterruptedException {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        HashMap hashMap = new HashMap();
        hashMap.put("delete.retention.ms", "1000");
        hashMap.put("retention.bytes", "1000");
        String remoteTopicName = remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, hashMap);
        waitForTopicCreated(this.backup, remoteTopicName);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, remoteTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
        this.backup.kafka().incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList));
        TestUtils.waitForCondition(() -> {
            String topicConfig = getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms");
            String topicConfig2 = getTopicConfig(this.backup.kafka(), remoteTopicName, "delete.retention.ms");
            Assertions.assertNotEquals(topicConfig, topicConfig2, "`delete.retention.ms` should be different, because it's in exclude filter! ");
            Assertions.assertEquals("2000", topicConfig2, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
            Assertions.assertEquals(getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes"), getTopicConfig(this.backup.kafka(), remoteTopicName, "retention.bytes"), "`retention.bytes` should be the same, because it isn't in exclude filter! ");
            return true;
        }, 30000L, "Topic configurations were not synced");
    }

    @Test
    public void testReplicateSourceDefault() throws Exception {
        this.mm2Props.put("use.defaults.from", "source");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        String remoteTopicName = remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, new HashMap());
        waitForTopicCreated(this.backup, remoteTopicName);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, remoteTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
        this.backup.kafka().incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList));
        TestUtils.waitForCondition(() -> {
            String topicConfig = getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
            String topicConfig2 = getTopicConfig(this.backup.kafka(), remoteTopicName, "retention.bytes");
            Assertions.assertEquals(topicConfig, topicConfig2, "`retention.bytes` should be the same, because the source cluster default is being used! ");
            Assertions.assertEquals("-1", topicConfig2, "`retention.bytes` should be synced with default value!");
            String topicConfig3 = getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms");
            String topicConfig4 = getTopicConfig(this.backup.kafka(), remoteTopicName, "delete.retention.ms");
            Assertions.assertNotEquals(topicConfig3, topicConfig4, "`delete.retention.ms` should be different, because it's in exclude filter! ");
            Assertions.assertEquals("2000", topicConfig4, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
            return true;
        }, 30000L, "Topic configurations were not synced");
    }

    @Test
    public void testReplicateTargetDefault() throws Exception {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", "1000");
        String remoteTopicName = remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, hashMap);
        waitForTopicCreated(this.backup, remoteTopicName);
        TestUtils.waitForCondition(() -> {
            String topicConfig = getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
            String topicConfig2 = getTopicConfig(this.backup.kafka(), remoteTopicName, "retention.bytes");
            Assertions.assertEquals(topicConfig, topicConfig2, "`retention.bytes` should be the same");
            Assertions.assertEquals("1000", topicConfig2, "`retention.bytes` should be synced with default value!");
            return true;
        }, 30000L, "Topic configurations were not synced");
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-topic-with-config");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "1000"), AlterConfigOp.OpType.DELETE));
        this.primary.kafka().incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList));
        TestUtils.waitForCondition(() -> {
            Assertions.assertEquals("-1", getTopicConfig(this.backup.kafka(), remoteTopicName, "retention.bytes"), "`retention.bytes` should be synced with target's default value!");
            return true;
        }, 30000L, "Topic configurations were not synced");
    }

    private TopicPartition remoteTopicPartition(TopicPartition topicPartition, String str) {
        return new TopicPartition(remoteTopicName(topicPartition.topic(), str), topicPartition.partition());
    }

    void createAndTestNewTopicWithConfigFilter() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("delete.retention.ms", "1000");
        hashMap.put("retention.bytes", "1000");
        String remoteTopicName = remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, hashMap);
        waitForTopicCreated(this.backup, remoteTopicName);
        Assertions.assertNotEquals(getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms"), getTopicConfig(this.backup.kafka(), remoteTopicName, "delete.retention.ms"), "`delete.retention.ms` should be different, because it's in exclude filter! ");
        String topicConfig = getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
        String topicConfig2 = getTopicConfig(this.backup.kafka(), remoteTopicName, "retention.bytes");
        Assertions.assertEquals(topicConfig, topicConfig2, "`retention.bytes` should be the same, because it isn't in exclude filter! ");
        Assertions.assertEquals("1000", topicConfig2, "`retention.bytes` should be the same, because it's explicitly defined! ");
    }

    String remoteTopicName(String str, String str2) {
        return str2 + "." + str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster embeddedConnectCluster, List<Class<? extends Connector>> list, MirrorMakerConfig mirrorMakerConfig, String str, String str2) throws InterruptedException {
        for (Class<? extends Connector> cls : list) {
            embeddedConnectCluster.configureConnector(cls.getSimpleName(), mirrorMakerConfig.connectorBaseConfig(new SourceAndTarget(str, str2), cls));
        }
        for (Class<? extends Connector> cls2 : list) {
            embeddedConnectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(cls2.getSimpleName(), 1, "Connector " + cls2.getSimpleName() + " tasks did not start in time on cluster: " + embeddedConnectCluster.getName());
        }
    }

    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster embeddedConnectCluster, List<Class<? extends Connector>> list) {
        Iterator<Class<? extends Connector>> it = list.iterator();
        while (it.hasNext()) {
            embeddedConnectCluster.restartConnectorAndTasks(it.next().getSimpleName(), false, true, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void waitForTopicCreated(EmbeddedConnectCluster embeddedConnectCluster, String str) throws InterruptedException {
        Admin createAdminClient = embeddedConnectCluster.kafka().createAdminClient();
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return ((Set) createAdminClient.listTopics().names().get(60000L, TimeUnit.MILLISECONDS)).contains(str);
                }, 30000L, "Topic: " + str + " didn't get created in the cluster");
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private static void deleteAllTopics(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        Throwable th = null;
        try {
            Set set = (Set) createAdminClient.listTopics().names().get();
            log.debug("Deleting topics: {} ", set);
            createAdminClient.deleteTopics(set).all().get();
            if (createAdminClient != null) {
                if (0 == 0) {
                    createAdminClient.close();
                    return;
                }
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getTopicConfig(EmbeddedKafkaCluster embeddedKafkaCluster, String str, String str2) throws Exception {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        Throwable th = null;
        try {
            try {
                String value = ((Config) ((Map) createAdminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, str))).all().get()).values().toArray()[0]).get(str2).value();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                return value;
            } finally {
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceMessages(EmbeddedConnectCluster embeddedConnectCluster, String str) {
        for (Map.Entry<String, String> entry : org.apache.kafka.connect.mirror.TestUtils.generateRecords(NUM_RECORDS_PRODUCED).entrySet()) {
            produce(embeddedConnectCluster.kafka(), str, null, entry.getKey(), entry.getValue());
        }
    }

    private void produceMessages(EmbeddedConnectCluster embeddedConnectCluster, String str, int i) {
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            for (int i4 = 0; i4 < i; i4++) {
                int i5 = i2;
                i2++;
                produce(embeddedConnectCluster.kafka(), str, Integer.valueOf(i4), "key", "value-" + i5);
            }
        }
    }

    protected void produce(EmbeddedKafkaCluster embeddedKafkaCluster, String str, Integer num, String str2, String str3) {
        embeddedKafkaCluster.produce(str, num, str2, str3);
    }

    private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(MirrorClient mirrorClient, String str, String str2, String str3) throws InterruptedException {
        return waitForNewCheckpointOnAllPartitions(mirrorClient, str, str2, str3, Collections.emptyMap());
    }

    protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(MirrorClient mirrorClient, String str, String str2, String str3, Map<TopicPartition, OffsetAndMetadata> map) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            Map remoteConsumerOffsets = mirrorClient.remoteConsumerOffsets(str, str2, Duration.ofMillis(3000L));
            for (int i = 0; i < 10; i++) {
                TopicPartition topicPartition = new TopicPartition(str3, i);
                if (!remoteConsumerOffsets.containsKey(topicPartition)) {
                    log.info("Checkpoint is missing for {}: {}-{}", new Object[]{str, str3, Integer.valueOf(i)});
                    return false;
                }
                if (map.containsKey(topicPartition) && ((OffsetAndMetadata) map.get(topicPartition)).equals(remoteConsumerOffsets.get(topicPartition))) {
                    log.info("Checkpoint is the same as previous checkpoint");
                    return false;
                }
            }
            atomicReference.set(remoteConsumerOffsets);
            return true;
        }, 20000L, String.format("Offsets for consumer group %s not translated from %s for topic %s", str, str2, str3));
        return (Map) atomicReference.get();
    }

    private static <T> void waitForConsumerGroupFullSync(EmbeddedConnectCluster embeddedConnectCluster, List<String> list, String str, int i, int i2) throws InterruptedException {
        Admin createAdminClient;
        Throwable th;
        int size = i * list.size();
        HashMap hashMap = new HashMap();
        hashMap.put("isolation.level", "read_committed");
        hashMap.put("auto.offset.reset", "earliest");
        HashMap hashMap2 = new HashMap();
        KafkaConsumer createConsumerAndSubscribeTo = embeddedConnectCluster.kafka().createConsumerAndSubscribeTo(hashMap, (String[]) list.toArray(new String[0]));
        Throwable th2 = null;
        try {
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                TestUtils.waitForCondition(() -> {
                    ConsumerRecords poll = createConsumerAndSubscribeTo.poll(CONSUMER_POLL_TIMEOUT_MS);
                    poll.forEach(consumerRecord -> {
                    });
                    return size == atomicInteger.addAndGet(poll.count());
                }, 20000L, "Consumer cannot consume all records in time");
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                createAdminClient = embeddedConnectCluster.kafka().createAdminClient();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    ArrayList arrayList = new ArrayList(10 * list.size());
                    for (int i3 = 0; i3 < 10; i3++) {
                        Iterator<String> it = list.iterator();
                        while (it.hasNext()) {
                            arrayList.add(new TopicPartition(it.next(), i3));
                        }
                    }
                    TestUtils.waitForCondition(() -> {
                        Map map = (Map) createAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
                        Map map2 = (Map) createAdminClient.listOffsets((Map) arrayList.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                            return OffsetSpec.latest();
                        }))).all().get();
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            TopicPartition topicPartition2 = (TopicPartition) it2.next();
                            Assertions.assertTrue(map.containsKey(topicPartition2), "TopicPartition " + topicPartition2 + " does not have translated offsets");
                            long offset = ((OffsetAndMetadata) map.get(topicPartition2)).offset();
                            Assertions.assertTrue(offset > ((Long) hashMap2.get(topicPartition2)).longValue() - ((long) i2), "TopicPartition " + topicPartition2 + " does not have fully-translated offsets: " + offset + " is not close enough to " + hashMap2.get(topicPartition2) + " (strictly more than " + (((Long) hashMap2.get(topicPartition2)).longValue() - i2) + ")");
                            Assertions.assertTrue(offset <= ((ListOffsetsResult.ListOffsetsResultInfo) map2.get(topicPartition2)).offset(), "TopicPartition " + topicPartition2 + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
                        }
                        return true;
                    }, 30000L, "Consumer group offset sync is not complete in time");
                    if (createAdminClient != null) {
                        if (0 == 0) {
                            createAdminClient.close();
                            return;
                        }
                        try {
                            createAdminClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (createAdminClient != null) {
                    if (th != null) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (createConsumerAndSubscribeTo != null) {
                if (th2 != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th9;
        }
    }

    private static void assertMonotonicCheckpoints(EmbeddedConnectCluster embeddedConnectCluster, String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        KafkaConsumer createConsumerAndSubscribeTo = embeddedConnectCluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("auto.offset.reset", "earliest"), new String[]{str});
        Throwable th = null;
        try {
            try {
                HashMap hashMap = new HashMap();
                long currentTimeMillis = System.currentTimeMillis() + 20000;
                do {
                    Iterator it = createConsumerAndSubscribeTo.poll(Duration.ofSeconds(1L)).iterator();
                    while (it.hasNext()) {
                        Checkpoint deserializeRecord = Checkpoint.deserializeRecord((ConsumerRecord) it.next());
                        Map map = (Map) hashMap.computeIfAbsent(deserializeRecord.consumerGroupId(), str2 -> {
                            return new HashMap();
                        });
                        Assertions.assertTrue(deserializeRecord.downstreamOffset() >= ((Checkpoint) map.getOrDefault(deserializeRecord.topicPartition(), deserializeRecord)).downstreamOffset(), "Checkpoint was non-monotonic for " + deserializeRecord.consumerGroupId() + ": " + deserializeRecord.topicPartition());
                        map.put(deserializeRecord.topicPartition(), deserializeRecord);
                    }
                    if (createConsumerAndSubscribeTo.currentLag(topicPartition).orElse(1L) <= 0) {
                        break;
                    }
                } while (System.currentTimeMillis() < currentTimeMillis);
                Assertions.assertEquals(0L, createConsumerAndSubscribeTo.currentLag(topicPartition).orElse(1L), "Unable to read all checkpoints within 20000ms");
                if (createConsumerAndSubscribeTo != null) {
                    if (0 == 0) {
                        createConsumerAndSubscribeTo.close();
                        return;
                    }
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createConsumerAndSubscribeTo != null) {
                if (th != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th4;
        }
    }

    private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> consumer, int i) {
        ConsumerRecords poll = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
        for (TopicPartition topicPartition : poll.partitions()) {
            Assertions.assertTrue(poll.records(topicPartition).size() < i, "downstream consumer is re-reading more than " + i + " records from" + topicPartition);
        }
    }

    private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int i) throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestUtils.waitForCondition(() -> {
            return i == atomicInteger.addAndGet(consumer.poll(CONSUMER_POLL_TIMEOUT_MS).count());
        }, 20000L, "Consumer cannot consume all records in time");
        consumer.commitSync();
    }

    private static Map<String, String> basicMM2Config() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusters", "primary, backup");
        hashMap.put("max.tasks", "10");
        hashMap.put("groups", "consumer-group-.*");
        hashMap.put("sync.topic.acls.enabled", "false");
        hashMap.put("emit.checkpoints.interval.seconds", String.valueOf(1));
        hashMap.put("emit.heartbeats.interval.seconds", "1");
        hashMap.put("refresh.topics.interval.seconds", "1");
        hashMap.put("refresh.groups.interval.seconds", "1");
        hashMap.put("checkpoints.topic.replication.factor", "1");
        hashMap.put("heartbeats.topic.replication.factor", "1");
        hashMap.put("offset-syncs.topic.replication.factor", "1");
        hashMap.put("config.storage.replication.factor", "1");
        hashMap.put("offset.storage.replication.factor", "1");
        hashMap.put("status.storage.replication.factor", "1");
        hashMap.put("replication.factor", "1");
        hashMap.put("primary.offset.flush.interval.ms", "5000");
        hashMap.put("backup.offset.flush.interval.ms", "5000");
        return hashMap;
    }

    private void createTopics() {
        Map singletonMap = Collections.singletonMap("cleanup.policy", "compact");
        Map emptyMap = Collections.emptyMap();
        Map singletonMap2 = Collections.singletonMap("request.timeout.ms", 60000);
        this.primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, emptyMap, singletonMap2);
        this.primary.kafka().createTopic("test-topic-1", 10, 1, singletonMap, singletonMap2);
        this.backup.kafka().createTopic("test-topic-1", 10, 1, emptyMap, singletonMap2);
        this.primary.kafka().createTopic("heartbeats", 1, 1, emptyMap, singletonMap2);
        this.backup.kafka().createTopic("heartbeats", 1, 1, emptyMap, singletonMap2);
        if (this.createReplicatedTopicsUpfront.booleanValue()) {
            this.primary.kafka().createTopic(remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS), 1, 1, emptyMap, singletonMap2);
            this.backup.kafka().createTopic(remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS), 1, 1, emptyMap, singletonMap2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void warmUpConsumer(Map<String, Object> map) {
        Throwable th;
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(map, new String[]{"test-topic-1"});
        Throwable th2 = null;
        try {
            try {
                createConsumerAndSubscribeTo.poll(CONSUMER_POLL_TIMEOUT_MS);
                createConsumerAndSubscribeTo.commitSync();
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(map, new String[]{"test-topic-1"});
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    createConsumerAndSubscribeTo.poll(CONSUMER_POLL_TIMEOUT_MS);
                    createConsumerAndSubscribeTo.commitSync();
                    if (createConsumerAndSubscribeTo != null) {
                        if (0 == 0) {
                            createConsumerAndSubscribeTo.close();
                            return;
                        }
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    private static void topicShouldNotBeCreated(EmbeddedConnectCluster embeddedConnectCluster, String str) throws InterruptedException {
        Admin createAdminClient = embeddedConnectCluster.kafka().createAdminClient();
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return !((Set) createAdminClient.listTopics().names().get(60000L, TimeUnit.MILLISECONDS)).contains(str);
                }, 60000L, "Topic: " + str + " get created on cluster: " + embeddedConnectCluster.getName());
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void waitForTopicPartitionCreated(EmbeddedConnectCluster embeddedConnectCluster, String str, int i) throws InterruptedException {
        Admin createAdminClient = embeddedConnectCluster.kafka().createAdminClient();
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return ((TopicDescription) ((Map) createAdminClient.describeTopics(Collections.singleton(str)).allTopicNames().get()).get(str)).partitions().size() == i;
                }, 60000L, "Topic: " + str + "'s partitions didn't get created on cluster: " + embeddedConnectCluster.getName());
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }
}
