/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.TestUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="integration")
public class MirrorConnectorsIntegrationBaseTest {
    private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
    protected static final int NUM_RECORDS_PER_PARTITION = 10;
    protected static final int NUM_PARTITIONS = 10;
    protected static final int NUM_RECORDS_PRODUCED = 100;
    protected static final int OFFSET_LAG_MAX = 10;
    protected static final int RECORD_PRODUCE_DURATION_MS = 20000;
    protected static final int RECORD_TRANSFER_DURATION_MS = 30000;
    protected static final int CHECKPOINT_DURATION_MS = 20000;
    private static final int RECORD_CONSUME_DURATION_MS = 20000;
    private static final int OFFSET_SYNC_DURATION_MS = 30000;
    private static final int TOPIC_SYNC_DURATION_MS = 60000;
    private static final int REQUEST_TIMEOUT_DURATION_MS = 60000;
    private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1000;
    private static final int NUM_WORKERS = 3;
    protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
    protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
    protected static final String BACKUP_CLUSTER_ALIAS = "backup";
    protected static final List<Class<? extends Connector>> CONNECTOR_LIST = Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
    private volatile boolean shuttingDown;
    protected Map<String, String> mm2Props = new HashMap<String, String>();
    protected MirrorMakerConfig mm2Config;
    protected EmbeddedConnectCluster primary;
    protected EmbeddedConnectCluster backup;
    protected Producer<byte[], byte[]> primaryProducer;
    protected Producer<byte[], byte[]> backupProducer;
    protected Map<String, String> additionalPrimaryClusterClientsConfigs = new HashMap<String, String>();
    protected Map<String, String> additionalBackupClusterClientsConfigs = new HashMap<String, String>();
    protected boolean replicateBackupToPrimary = true;
    protected Boolean createReplicatedTopicsUpfront = false;
    protected Exit.Procedure exitProcedure;
    private Exit.Procedure haltProcedure;
    protected Properties primaryBrokerProps = new Properties();
    protected Properties backupBrokerProps = new Properties();
    protected Map<String, String> primaryWorkerProps = new HashMap<String, String>();
    protected Map<String, String> backupWorkerProps = new HashMap<String, String>();

    @BeforeEach
    public void startClusters() throws Exception {
        this.startClusters((Map<String, String>)new HashMap<String, String>(){
            {
                this.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
            }
        });
    }

    public void startClusters(Map<String, String> additionalMM2Config) throws Exception {
        this.shuttingDown = false;
        this.exitProcedure = (code, message) -> {
            if (this.shuttingDown) {
                return;
            }
            if (code != 0) {
                String exitMessage = "Abrupt service exit with code " + code + " and message " + message;
                log.warn(exitMessage);
                throw new UngracefulShutdownException(exitMessage);
            }
        };
        this.haltProcedure = (code, message) -> {
            if (this.shuttingDown) {
                return;
            }
            if (code != 0) {
                String haltMessage = "Abrupt service halt with code " + code + " and message " + message;
                log.warn(haltMessage);
                throw new UngracefulShutdownException(haltMessage);
            }
        };
        Exit.setExitProcedure((Exit.Procedure)this.exitProcedure);
        Exit.setHaltProcedure((Exit.Procedure)this.haltProcedure);
        this.primaryBrokerProps.put("auto.create.topics.enable", "false");
        this.backupBrokerProps.put("auto.create.topics.enable", "false");
        this.mm2Props.putAll(MirrorConnectorsIntegrationBaseTest.basicMM2Config());
        this.mm2Props.put("primary->backup.enabled", "true");
        this.mm2Props.put("backup->primary.enabled", Boolean.toString(this.replicateBackupToPrimary));
        this.mm2Props.putAll(additionalMM2Config);
        this.mm2Props.put("config.properties.exclude", "delete\\.retention\\..*");
        this.mm2Props.put("sync.topic.configs.interval.seconds", "1");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        this.primaryWorkerProps = this.mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
        this.backupWorkerProps.putAll(this.mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
        this.primary = (EmbeddedConnectCluster)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)new EmbeddedConnectCluster.Builder().name("primary-connect-cluster").numWorkers(3).numBrokers(1)).brokerProps(this.primaryBrokerProps)).workerProps(this.primaryWorkerProps)).maskExitProcedures(false)).clientProps(this.additionalPrimaryClusterClientsConfigs)).build();
        this.backup = (EmbeddedConnectCluster)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)((EmbeddedConnectCluster.Builder)new EmbeddedConnectCluster.Builder().name("backup-connect-cluster").numWorkers(3).numBrokers(1)).brokerProps(this.backupBrokerProps)).workerProps(this.backupWorkerProps)).maskExitProcedures(false)).clientProps(this.additionalBackupClusterClientsConfigs)).build();
        this.primary.start();
        this.primary.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of primary-connect-cluster did not start in time.");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, "mm2-status.backup.internal");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, "mm2-offsets.backup.internal");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, "mm2-configs.backup.internal");
        this.backup.start();
        this.backup.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of backup-connect-cluster did not start in time.");
        this.primaryProducer = this.initializeProducer(this.primary);
        this.backupProducer = this.initializeProducer(this.backup);
        this.createTopics();
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, "mm2-status.primary.internal");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, "mm2-offsets.primary.internal");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, "mm2-configs.primary.internal");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, "test-topic-1");
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, "test-topic-1");
        this.warmUpConsumer(Collections.singletonMap("group.id", "consumer-group-dummy"));
        log.info("primary REST service: {}", (Object)this.primary.endpointForResource("connectors"));
        log.info("backup REST service: {}", (Object)this.backup.endpointForResource("connectors"));
        log.info("primary brokers: {}", (Object)this.primary.kafka().bootstrapServers());
        log.info("backup brokers: {}", (Object)this.backup.kafka().bootstrapServers());
        this.mm2Props.put("primary.bootstrap.servers", this.primary.kafka().bootstrapServers());
        this.mm2Props.put("backup.bootstrap.servers", this.backup.kafka().bootstrapServers());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @AfterEach
    public void shutdownClusters() throws Exception {
        try {
            Utils.closeQuietly(this.primaryProducer, (String)"primary producer");
            Utils.closeQuietly(this.backupProducer, (String)"backup producer");
            for (String x : this.primary.connectors()) {
                this.primary.deleteConnector(x);
            }
            for (String x : this.backup.connectors()) {
                this.backup.deleteConnector(x);
            }
        }
        finally {
            this.shuttingDown = true;
            try {
                try {
                    this.primary.stop();
                }
                finally {
                    this.backup.stop();
                }
            }
            finally {
                Exit.resetExitProcedure();
                Exit.resetHaltProcedure();
            }
        }
    }

    @Test
    public void testReplication() throws Exception {
        this.produceMessages(this.primaryProducer, "test-topic-1");
        String backupTopic1 = this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        if (this.replicateBackupToPrimary) {
            this.produceMessages(this.backupProducer, "test-topic-1");
        }
        String reverseTopic1 = this.remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
        String consumerGroupName = "consumer-group-testReplication";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.warmUpConsumer(consumerProps);
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        List<Class<? extends Connector>> primaryConnectors = this.replicateBackupToPrimary ? CONNECTOR_LIST : Collections.singletonList(MirrorHeartbeatConnector.class);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.primary, primaryConnectors, this.mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
        MirrorClient primaryClient = new MirrorClient(this.mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
        MirrorClient backupClient = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, reverseTopic1);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic1);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, "mm2-offset-syncs.backup.internal");
        Assertions.assertEquals((Object)"compact", (Object)MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic1, "cleanup.policy"), (String)"topic config was not synced");
        this.createAndTestNewTopicWithConfigFilter();
        Assertions.assertEquals((int)100, (int)this.primary.kafka().consume(100, 30000L, new String[]{"test-topic-1"}).count(), (String)"Records were not produced to primary cluster.");
        Assertions.assertEquals((int)100, (int)this.backup.kafka().consume(100, 30000L, new String[]{backupTopic1}).count(), (String)"Records were not replicated to backup cluster.");
        Assertions.assertEquals((int)100, (int)this.backup.kafka().consume(100, 30000L, new String[]{"test-topic-1"}).count(), (String)"Records were not produced to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertEquals((int)100, (int)this.primary.kafka().consume(100, 30000L, new String[]{reverseTopic1}).count(), (String)"Records were not replicated to primary cluster.");
            Assertions.assertEquals((int)200, (int)this.primary.kafka().consume(200, 30000L, new String[]{reverseTopic1, "test-topic-1"}).count(), (String)"Primary cluster doesn't have all records from both clusters.");
            Assertions.assertEquals((int)200, (int)this.backup.kafka().consume(200, 30000L, new String[]{backupTopic1, "test-topic-1"}).count(), (String)"Backup cluster doesn't have all records from both clusters.");
        }
        Assertions.assertTrue((this.primary.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0 ? 1 : 0) != 0, (String)"Heartbeats were not emitted to primary cluster.");
        Assertions.assertTrue((this.backup.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0 ? 1 : 0) != 0, (String)"Heartbeats were not emitted to backup cluster.");
        Assertions.assertTrue((this.backup.kafka().consume(1, 30000L, new String[]{"primary.heartbeats"}).count() > 0 ? 1 : 0) != 0, (String)"Heartbeats were not replicated downstream to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertTrue((this.primary.kafka().consume(1, 30000L, new String[]{"backup.heartbeats"}).count() > 0 ? 1 : 0) != 0, (String)"Heartbeats were not replicated downstream to primary cluster.");
        }
        Assertions.assertTrue((boolean)backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), (String)"Did not find upstream primary cluster.");
        Assertions.assertEquals((int)1, (int)backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), (String)"Did not calculate replication hops correctly.");
        Assertions.assertTrue((this.backup.kafka().consume(1, 20000L, new String[]{"primary.checkpoints.internal"}).count() > 0 ? 1 : 0) != 0, (String)"Checkpoints were not emitted downstream to backup cluster.");
        if (this.replicateBackupToPrimary) {
            Assertions.assertTrue((boolean)primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), (String)"Did not find upstream backup cluster.");
            Assertions.assertEquals((int)1, (int)primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), (String)"Did not calculate replication hops correctly.");
            Assertions.assertTrue((this.primary.kafka().consume(1, 20000L, new String[]{"backup.checkpoints.internal"}).count() > 0 ? 1 : 0) != 0, (String)"Checkpoints were not emitted upstream to primary cluster.");
        }
        Map<TopicPartition, OffsetAndMetadata> backupOffsets = MirrorConnectorsIntegrationBaseTest.waitForCheckpointOnAllPartitions(backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, backupTopic1);
        try (KafkaConsumer primaryConsumer = this.backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName));){
            primaryConsumer.assign(backupOffsets.keySet());
            backupOffsets.forEach((arg_0, arg_1) -> ((Consumer)primaryConsumer).seek(arg_0, arg_1));
            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            primaryConsumer.commitAsync();
            Assertions.assertTrue((primaryConsumer.position(new TopicPartition(backupTopic1, 0)) > 0L ? 1 : 0) != 0, (String)"Consumer failedover to zero offset.");
            Assertions.assertTrue((primaryConsumer.position(new TopicPartition(backupTopic1, 0)) <= 100L ? 1 : 0) != 0, (String)"Consumer failedover beyond expected offset.");
        }
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
        primaryClient.close();
        backupClient.close();
        if (this.replicateBackupToPrimary) {
            Map<TopicPartition, OffsetAndMetadata> primaryOffsets = MirrorConnectorsIntegrationBaseTest.waitForCheckpointOnAllPartitions(primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, reverseTopic1);
            try (KafkaConsumer primaryConsumer = this.primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName));){
                primaryConsumer.assign(primaryOffsets.keySet());
                primaryOffsets.forEach((arg_0, arg_1) -> ((Consumer)primaryConsumer).seek(arg_0, arg_1));
                primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
                primaryConsumer.commitAsync();
                Assertions.assertTrue((primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) > 0L ? 1 : 0) != 0, (String)"Consumer failedback to zero downstream offset.");
                Assertions.assertTrue((primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) <= 100L ? 1 : 0) != 0, (String)"Consumer failedback beyond expected downstream offset.");
            }
        }
        this.primary.kafka().createTopic("test-topic-2", 10);
        String backupTopic2 = this.remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic2);
        this.produceMessages(this.primaryProducer, "test-topic-2", 1);
        Assertions.assertEquals((int)10, (int)this.primary.kafka().consume(10, 30000L, new String[]{"test-topic-2"}).count(), (String)"Records were not produced to primary cluster.");
        Assertions.assertEquals((int)10, (int)this.backup.kafka().consume(10, 60000L, new String[]{backupTopic2}).count(), (String)"New topic was not replicated to backup cluster.");
        if (this.replicateBackupToPrimary) {
            this.backup.kafka().createTopic("test-topic-3", 10);
            String reverseTopic3 = this.remoteTopicName("test-topic-3", BACKUP_CLUSTER_ALIAS);
            MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.primary, reverseTopic3);
            this.produceMessages(this.backupProducer, "test-topic-3", 1);
            Assertions.assertEquals((int)10, (int)this.backup.kafka().consume(10, 30000L, new String[]{"test-topic-3"}).count(), (String)"Records were not produced to backup cluster.");
            Assertions.assertEquals((int)10, (int)this.primary.kafka().consume(10, 60000L, new String[]{reverseTopic3}).count(), (String)"New topic was not replicated to primary cluster.");
        }
    }

    @Test
    public void testReplicationWithEmptyPartition() throws Exception {
        String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
        Map<String, String> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        String topic = "test-topic-with-empty-partition";
        this.primary.kafka().createTopic(topic, 10);
        this.produceMessages(this.primaryProducer, topic, 9);
        int expectedRecords = 90;
        try (KafkaConsumer primaryConsumer = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{topic});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(primaryConsumer, expectedRecords);
        }
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        Thread.sleep(TimeUnit.SECONDS.toMillis(3L));
        String backupTopic = this.remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
        try (KafkaConsumer backupConsumer = this.backup.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{backupTopic});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(backupConsumer, expectedRecords);
        }
        var7_9 = null;
        try (Admin backupClient = this.backup.kafka().createAdminClient();){
            Map remoteOffsets = (Map)backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
            OffsetAndMetadata offset = (OffsetAndMetadata)remoteOffsets.get(new TopicPartition(backupTopic, 9));
            Assertions.assertNotNull((Object)offset, (String)"Offset of last partition was not replicated");
            Assertions.assertEquals((long)0L, (long)offset.offset(), (String)"Offset of last partition is not zero");
        }
        catch (Throwable throwable) {
            var7_9 = throwable;
            throw throwable;
        }
    }

    @Test
    public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
        this.testOneWayReplicationWithOffsetSyncs(10);
    }

    @Test
    public void testOneWayReplicationWithFrequentOffsetSyncs() throws InterruptedException {
        this.testOneWayReplicationWithOffsetSyncs(0);
    }

    private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
        this.produceMessages(this.primaryProducer, "test-topic-1");
        String backupTopic1 = this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        final String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
        HashMap<String, Object> consumerProps = new HashMap<String, Object>(){
            {
                this.put("group.id", consumerGroupName);
                this.put("auto.offset.reset", "earliest");
            }
        };
        try (KafkaConsumer primaryConsumer = this.primary.kafka().createConsumerAndSubscribeTo((Map)consumerProps, new String[]{"test-topic-1"});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(primaryConsumer, 100);
        }
        this.mm2Props.put("sync.group.offsets.enabled", "true");
        this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
        this.mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        String reverseTopic1 = this.remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
        if (!"test-topic-1".equals(reverseTopic1)) {
            MirrorConnectorsIntegrationBaseTest.topicShouldNotBeCreated(this.primary, reverseTopic1);
        }
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic1);
        try (KafkaConsumer backupConsumer = this.backup.kafka().createConsumerAndSubscribeTo((Map)consumerProps, new String[]{backupTopic1});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumerGroupFullSync(this.backup, Collections.singletonList(backupTopic1), consumerGroupName, 100, offsetLagMax);
            MirrorConnectorsIntegrationBaseTest.assertDownstreamRedeliveriesBoundedByMaxLag((Consumer<byte[], byte[]>)backupConsumer, offsetLagMax);
        }
        this.primary.kafka().createTopic("test-topic-2", 10);
        String remoteTopic2 = this.remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, remoteTopic2);
        this.produceMessages(this.primaryProducer, "test-topic-2");
        try (KafkaConsumer consumer1 = this.primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", consumerGroupName), new String[]{"test-topic-2"});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(consumer1, 100);
        }
        var8_13 = null;
        try (KafkaConsumer backupConsumer = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", consumerGroupName), new String[]{backupTopic1, remoteTopic2});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumerGroupFullSync(this.backup, Arrays.asList(backupTopic1, remoteTopic2), consumerGroupName, 100, offsetLagMax);
            MirrorConnectorsIntegrationBaseTest.assertDownstreamRedeliveriesBoundedByMaxLag((Consumer<byte[], byte[]>)backupConsumer, offsetLagMax);
        }
        catch (Throwable throwable) {
            var8_13 = throwable;
            throw throwable;
        }
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
    }

    @Test
    public void testOffsetSyncsTopicsOnTarget() throws Exception {
        this.mm2Props.put("primary->backup.offset-syncs.topic.location", "target");
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, "mm2-offset-syncs.primary.internal");
        String consumerGroupName = "consumer-group-syncs-on-target";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.produceMessages(this.primaryProducer, "test-topic-1");
        this.warmUpConsumer(consumerProps);
        String remoteTopic = this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        try (KafkaConsumer backupConsumer = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("auto.offset.reset", "earliest"), new String[]{"primary.checkpoints.internal"});){
            org.apache.kafka.test.TestUtils.waitForCondition(() -> MirrorConnectorsIntegrationBaseTest.lambda$testOffsetSyncsTopicsOnTarget$2((Consumer)backupConsumer, remoteTopic), (long)30000L, (String)"Unable to find checkpoints for primary.test-topic-1");
        }
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
        var5_5 = null;
        try (Admin adminClient = this.primary.kafka().createAdminClient();){
            Set primaryTopics = (Set)adminClient.listTopics().names().get();
            Assertions.assertFalse((boolean)primaryTopics.contains("mm2-offset-syncs.primary.internal"));
            Assertions.assertFalse((boolean)primaryTopics.contains("mm2-offset-syncs.backup.internal"));
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
    }

    @Test
    public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedException {
        String consumerGroupName = "consumer-group-no-checkpoints";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        this.produceMessages(this.primaryProducer, "test-topic-1");
        this.warmUpConsumer(consumerProps);
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS));
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, this.remoteTopicName("test-topic-no-checkpoints", PRIMARY_CLUSTER_ALIAS));
        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
        TopicPartition tp2 = new TopicPartition("test-topic-no-checkpoints", 0);
        try (KafkaConsumer consumer = this.primary.kafka().createConsumer(consumerProps);){
            List<TopicPartition> tps = Arrays.asList(tp1, tp2);
            Map endOffsets = consumer.endOffsets(tps);
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(((Long)e.getValue()).longValue())));
            consumer.commitSync(offsetsToCommit);
        }
        MirrorClient backupClient = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            Map translatedOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
            return translatedOffsets.containsKey(this.remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && !translatedOffsets.containsKey(this.remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
        }, (long)30000L, (String)"Checkpoints were not emitted correctly to backup cluster");
        this.produceMessages(this.primaryProducer, "test-topic-no-checkpoints");
        try (KafkaConsumer consumer = this.primary.kafka().createConsumer(consumerProps);){
            List<TopicPartition> tps = Arrays.asList(tp1, tp2);
            Map endOffsets = consumer.endOffsets(tps);
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(((Long)e.getValue()).longValue())));
            consumer.commitSync(offsetsToCommit);
        }
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            Map translatedOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
            return translatedOffsets.containsKey(this.remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && translatedOffsets.containsKey(this.remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
        }, (long)30000L, (String)"Checkpoints were not emitted correctly to backup cluster");
        backupClient.close();
    }

    @Test
    public void testRestartReplication() throws InterruptedException {
        String consumerGroupName = "consumer-group-restart";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        String remoteTopic = this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        this.warmUpConsumer(consumerProps);
        this.mm2Props.put("sync.group.offsets.enabled", "true");
        this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
        this.mm2Props.put("offset.lag.max", Integer.toString(10));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        this.produceMessages(this.primaryProducer, "test-topic-1");
        try (KafkaConsumer primaryConsumer = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});){
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(primaryConsumer, 100);
        }
        MirrorConnectorsIntegrationBaseTest.waitForConsumerGroupFullSync(this.backup, Collections.singletonList(remoteTopic), consumerGroupName, 100, 10);
        MirrorConnectorsIntegrationBaseTest.restartMirrorMakerConnectors(this.backup, CONNECTOR_LIST);
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
        Thread.sleep(5000L);
        this.produceMessages(this.primaryProducer, "test-topic-1");
        primaryConsumer = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});
        var5_5 = null;
        try {
            MirrorConnectorsIntegrationBaseTest.waitForConsumingAllRecords(primaryConsumer, 100);
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (primaryConsumer != null) {
                if (var5_5 != null) {
                    try {
                        primaryConsumer.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    primaryConsumer.close();
                }
            }
        }
        MirrorConnectorsIntegrationBaseTest.waitForConsumerGroupFullSync(this.backup, Collections.singletonList(remoteTopic), consumerGroupName, 200, 10);
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
    }

    @Test
    public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
        Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
        String consumerGroupName = "consumer-group-lagging-behind";
        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
        String remoteTopic = this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
        this.warmUpConsumer(consumerProps);
        this.mm2Props.put("sync.group.offsets.enabled", "true");
        this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
        this.mm2Props.put("offset.lag.max", Integer.toString(10));
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        this.produceMessages(this.primaryProducer, "test-topic-1");
        this.warmUpConsumer(consumerProps);
        MirrorClient backupClient = new MirrorClient(this.mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
        Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = MirrorConnectorsIntegrationBaseTest.waitForCheckpointOnAllPartitions(backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic);
        int iterations = 100;
        for (int i = 1; i < iterations; ++i) {
            this.produceMessages(this.primaryProducer, "test-topic-1");
        }
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, remoteTopic);
        Assertions.assertEquals((int)(iterations * 100), (int)this.backup.kafka().consume(iterations * 100, 30000L, new String[]{remoteTopic}).count(), (String)"Records were not replicated to backup cluster.");
        ConsumerRecords allRecords = this.primary.kafka().consume(iterations * 100, 20000L, new String[]{"test-topic-1"});
        log.info("Initial checkpoints: {}", initialCheckpoints);
        KafkaConsumer primaryConsumer = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});
        Object object = null;
        try {
            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            primaryConsumer.commitSync(this.partialOffsets((ConsumerRecords<byte[], byte[]>)allRecords, 0.9f));
            partialCheckpoints = MirrorConnectorsIntegrationBaseTest.waitForNewCheckpointOnAllPartitions(backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
            log.info("Partial checkpoints: {}", partialCheckpoints);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (primaryConsumer != null) {
                if (object != null) {
                    try {
                        primaryConsumer.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    primaryConsumer.close();
                }
            }
        }
        for (TopicPartition tp : initialCheckpoints.keySet()) {
            Assertions.assertTrue((initialCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset() ? 1 : 0) != 0, (String)"Checkpoints should advance when the upstream consumer group advances");
        }
        MirrorConnectorsIntegrationBaseTest.assertMonotonicCheckpoints(this.backup, "primary.checkpoints.internal");
        try (KafkaConsumer primaryConsumer2 = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});){
            primaryConsumer2.poll(CONSUMER_POLL_TIMEOUT_MS);
            primaryConsumer2.commitSync(this.partialOffsets((ConsumerRecords<byte[], byte[]>)allRecords, 0.1f));
            finalCheckpoints = MirrorConnectorsIntegrationBaseTest.waitForNewCheckpointOnAllPartitions(backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
            log.info("Final checkpoints: {}", finalCheckpoints);
        }
        backupClient.close();
        for (TopicPartition tp : partialCheckpoints.keySet()) {
            Assertions.assertTrue((finalCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset() ? 1 : 0) != 0, (String)"Checkpoints should rewind when the upstream consumer group rewinds");
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> allRecords, double fraction) {
        return allRecords.partitions().stream().collect(Collectors.toMap(Function.identity(), partition -> {
            List records = allRecords.records(partition);
            int index = (int)((double)records.size() * fraction);
            return new OffsetAndMetadata(((ConsumerRecord)records.get(index)).offset());
        }));
    }

    @Test
    public void testSyncTopicConfigs() throws InterruptedException {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        HashMap<String, String> topicConfig = new HashMap<String, String>();
        topicConfig.put("delete.retention.ms", "1000");
        topicConfig.put("retention.bytes", "1000");
        String topic = "test-topic-with-config";
        String backupTopic = this.remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, topicConfig);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
        Map configOps = Collections.singletonMap(configResource, ops);
        this.backup.kafka().incrementalAlterConfigs(configOps);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            String primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms");
            String backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "delete.retention.ms");
            Assertions.assertNotEquals((Object)primaryConfig, (Object)backupConfig, (String)"`delete.retention.ms` should be different, because it's in exclude filter! ");
            Assertions.assertEquals((Object)"2000", (Object)backupConfig, (String)"`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
            primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
            backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "retention.bytes");
            Assertions.assertEquals((Object)primaryConfig, (Object)backupConfig, (String)"`retention.bytes` should be the same, because it isn't in exclude filter! ");
            return true;
        }, (long)30000L, (String)"Topic configurations were not synced");
    }

    @Test
    public void testReplicateSourceDefault() throws Exception {
        this.mm2Props.put("use.defaults.from", "source");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        String topic = "test-topic-with-config";
        String backupTopic = this.remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, new HashMap());
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
        Map configOps = Collections.singletonMap(configResource, ops);
        this.backup.kafka().incrementalAlterConfigs(configOps);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            String primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
            String backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "retention.bytes");
            Assertions.assertEquals((Object)primaryConfig, (Object)backupConfig, (String)"`retention.bytes` should be the same, because the source cluster default is being used! ");
            Assertions.assertEquals((Object)"-1", (Object)backupConfig, (String)"`retention.bytes` should be synced with default value!");
            primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms");
            backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "delete.retention.ms");
            Assertions.assertNotEquals((Object)primaryConfig, (Object)backupConfig, (String)"`delete.retention.ms` should be different, because it's in exclude filter! ");
            Assertions.assertEquals((Object)"2000", (Object)backupConfig, (String)"`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
            return true;
        }, (long)30000L, (String)"Topic configurations were not synced");
    }

    @Test
    public void testReplicateTargetDefault() throws Exception {
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        HashMap<String, String> topicConfig = new HashMap<String, String>();
        topicConfig.put("retention.bytes", "1000");
        String topic = "test-topic-with-config";
        String backupTopic = this.remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, topicConfig);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            String primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
            String backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "retention.bytes");
            Assertions.assertEquals((Object)primaryConfig, (Object)backupConfig, (String)"`retention.bytes` should be the same");
            Assertions.assertEquals((Object)"1000", (Object)backupConfig, (String)"`retention.bytes` should be synced with default value!");
            return true;
        }, (long)30000L, (String)"Topic configurations were not synced");
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-topic-with-config");
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "1000"), AlterConfigOp.OpType.DELETE));
        Map configOps = Collections.singletonMap(configResource, ops);
        this.primary.kafka().incrementalAlterConfigs(configOps);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            String backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "retention.bytes");
            Assertions.assertEquals((Object)"-1", (Object)backupConfig, (String)"`retention.bytes` should be synced with target's default value!");
            return true;
        }, (long)30000L, (String)"Topic configurations were not synced");
    }

    @Test
    public void testReplicateFromLatest() throws Exception {
        String topic = "test-topic-1";
        this.produceMessages(this.primaryProducer, topic, 10);
        this.mm2Props.put("primary.consumer.auto.offset.reset", "latest");
        this.mm2Props.put("backup->primary.enabled", "false");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
        this.produceMessages(this.primaryProducer, topic, 10);
        String backupTopic = this.remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
        this.backup.kafka().consume(100, 30000L, new String[]{backupTopic});
        ConsumerRecords replicatedRecords = this.backup.kafka().consumeAll(30000L, new String[]{backupTopic});
        replicatedRecords.partitions().forEach(topicPartition -> {
            int replicatedCount = replicatedRecords.records(topicPartition).size();
            Assertions.assertEquals((int)10, (int)replicatedCount);
        });
    }

    private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
        return new TopicPartition(this.remoteTopicName(tp.topic(), alias), tp.partition());
    }

    void createAndTestNewTopicWithConfigFilter() throws Exception {
        HashMap<String, String> topicConfig = new HashMap<String, String>();
        topicConfig.put("delete.retention.ms", "1000");
        topicConfig.put("retention.bytes", "1000");
        String topic = "test-topic-with-config";
        String backupTopic = this.remoteTopicName("test-topic-with-config", PRIMARY_CLUSTER_ALIAS);
        this.primary.kafka().createTopic("test-topic-with-config", 10, 1, topicConfig);
        MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(this.backup, backupTopic);
        String primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "delete.retention.ms");
        String backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "delete.retention.ms");
        Assertions.assertNotEquals((Object)primaryConfig, (Object)backupConfig, (String)"`delete.retention.ms` should be different, because it's in exclude filter! ");
        primaryConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.primary.kafka(), "test-topic-with-config", "retention.bytes");
        backupConfig = MirrorConnectorsIntegrationBaseTest.getTopicConfig(this.backup.kafka(), backupTopic, "retention.bytes");
        Assertions.assertEquals((Object)primaryConfig, (Object)backupConfig, (String)"`retention.bytes` should be the same, because it isn't in exclude filter! ");
        Assertions.assertEquals((Object)"1000", (Object)backupConfig, (String)"`retention.bytes` should be the same, because it's explicitly defined! ");
    }

    String remoteTopicName(String topic, String clusterAlias) {
        return clusterAlias + "." + topic;
    }

    protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses, MirrorMakerConfig mm2Config, String primary, String backup) throws InterruptedException {
        for (Class<? extends Connector> connector : connectorClasses) {
            connectCluster.configureConnector(connector.getSimpleName(), mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), connector));
        }
        for (Class<? extends Connector> connector : connectorClasses) {
            connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector.getSimpleName(), 1, "Connector " + connector.getSimpleName() + " tasks did not start in time on cluster: " + connectCluster.getName());
        }
    }

    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) {
        for (Class<? extends Connector> connector : connectorClasses) {
            connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
        }
    }

    @SafeVarargs
    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class<? extends Connector> ... connectorClasses) throws InterruptedException {
        for (Class<? extends Connector> connectorClass : connectorClasses) {
            connectCluster.resumeConnector(connectorClass.getSimpleName());
        }
        for (Class<? extends Connector> connectorClass : connectorClasses) {
            String connectorName = connectorClass.getSimpleName();
            connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(connectorName, 1, "Connector '" + connectorName + "' and/or task did not resume in time");
        }
    }

    @SafeVarargs
    protected static void stopMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class<? extends Connector> ... connectorClasses) throws InterruptedException {
        for (Class<? extends Connector> connectorClass : connectorClasses) {
            connectCluster.stopConnector(connectorClass.getSimpleName());
        }
        for (Class<? extends Connector> connectorClass : connectorClasses) {
            String connectorName = connectorClass.getSimpleName();
            connectCluster.assertions().assertConnectorIsStopped(connectorName, "Connector did not stop in time");
        }
    }

    protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String ... topics) {
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
        String connectorName = MirrorSourceConnector.class.getSimpleName();
        ConnectorOffsets currentOffsets = connectCluster.connectorOffsets(connectorName);
        List alteredOffsetContents = currentOffsets.offsets().stream().map(connectorOffset -> {
            TopicPartition topicPartition = MirrorUtils.unwrapPartition((Map)connectorOffset.partition());
            if (!topicsSet.contains(topicPartition.topic())) {
                return null;
            }
            Object currentOffsetObject = connectorOffset.offset().get("offset");
            if (!(currentOffsetObject instanceof Integer) && !(currentOffsetObject instanceof Long)) {
                throw new AssertionError((Object)("Unexpected type for offset '" + currentOffsetObject + "'; should be integer or long"));
            }
            long currentOffset = ((Number)currentOffsetObject).longValue();
            long alteredOffset = alterOffset.applyAsLong(currentOffset);
            return new ConnectorOffset(connectorOffset.partition(), MirrorUtils.wrapOffset((long)alteredOffset));
        }).filter(Objects::nonNull).collect(Collectors.toList());
        connectCluster.alterConnectorOffsets(connectorName, new ConnectorOffsets(alteredOffsetContents));
    }

    protected static void resetSomeMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, String ... topics) {
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
        String connectorName = MirrorSourceConnector.class.getSimpleName();
        ConnectorOffsets currentOffsets = connectCluster.connectorOffsets(connectorName);
        List alteredOffsetContents = currentOffsets.offsets().stream().map(connectorOffset -> {
            TopicPartition topicPartition = MirrorUtils.unwrapPartition((Map)connectorOffset.partition());
            if (!topicsSet.contains(topicPartition.topic())) {
                return null;
            }
            return new ConnectorOffset(connectorOffset.partition(), null);
        }).filter(Objects::nonNull).collect(Collectors.toList());
        connectCluster.alterConnectorOffsets(connectorName, new ConnectorOffsets(alteredOffsetContents));
    }

    @SafeVarargs
    protected static void resetAllMirrorMakerConnectorOffsets(EmbeddedConnectCluster connectCluster, Class<? extends Connector> ... connectorClasses) {
        for (Class<? extends Connector> connectorClass : connectorClasses) {
            String connectorName = connectorClass.getSimpleName();
            connectCluster.resetConnectorOffsets(connectorName);
            Assertions.assertEquals(Collections.emptyList(), (Object)connectCluster.connectorOffsets(connectorName).offsets(), (String)"Offsets for connector should be completely empty after full reset");
        }
    }

    protected static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
        try (Admin adminClient = cluster.kafka().createAdminClient();){
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                Set topics = (Set)adminClient.listTopics().names().get(60000L, TimeUnit.MILLISECONDS);
                return topics.contains(topicName);
            }, (long)30000L, (String)("Topic: " + topicName + " didn't get created in the cluster"));
        }
    }

    protected static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception {
        try (Admin client = cluster.createAdminClient();){
            Set<ConfigResource> cr = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topic));
            DescribeConfigsResult configsResult = client.describeConfigs(cr);
            Config allConfigs = (Config)((Map)configsResult.all().get()).values().toArray()[0];
            String string = allConfigs.get(configName).value();
            return string;
        }
    }

    protected void produceMessages(Producer<byte[], byte[]> producer, String topicName) {
        this.produceMessages(producer, TestUtils.generateRecords(topicName, 100));
    }

    protected void produceMessages(Producer<byte[], byte[]> producer, String topicName, int numPartitions) {
        this.produceMessages(producer, TestUtils.generateRecords(topicName, 10, numPartitions));
    }

    protected Producer<byte[], byte[]> initializeProducer(EmbeddedConnectCluster cluster) {
        return cluster.kafka().createProducer(Collections.emptyMap());
    }

    protected void produceMessages(Producer<byte[], byte[]> producer, List<ProducerRecord<byte[], byte[]>> records) {
        ArrayList<Future> futures = new ArrayList<Future>();
        for (ProducerRecord<byte[], byte[]> producerRecord : records) {
            futures.add(producer.send(producerRecord));
        }
        Timer timer = Time.SYSTEM.timer(20000L);
        try {
            for (Future future : futures) {
                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
                timer.update();
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException exception) {
            throw new RuntimeException(exception);
        }
    }

    private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName) throws InterruptedException {
        return MirrorConnectorsIntegrationBaseTest.waitForNewCheckpointOnAllPartitions(client, consumerGroupName, remoteClusterAlias, topicName, Collections.emptyMap());
    }

    protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName, Map<TopicPartition, OffsetAndMetadata> lastCheckpoint) throws InterruptedException {
        AtomicReference ret = new AtomicReference();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            Map offsets = client.remoteConsumerOffsets(consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000L));
            for (int i = 0; i < 10; ++i) {
                TopicPartition tp = new TopicPartition(topicName, i);
                if (!offsets.containsKey(tp)) {
                    log.info("Checkpoint is missing for {}: {}-{}", new Object[]{consumerGroupName, topicName, i});
                    return false;
                }
                if (!lastCheckpoint.containsKey(tp) || !((OffsetAndMetadata)lastCheckpoint.get(tp)).equals(offsets.get(tp))) continue;
                log.info("Checkpoint is the same as previous checkpoint");
                return false;
            }
            ret.set(offsets);
            return true;
        }, (long)20000L, (String)String.format("Offsets for consumer group %s not translated from %s for topic %s", consumerGroupName, remoteClusterAlias, topicName));
        return (Map)ret.get();
    }

    private static <T> void waitForConsumerGroupFullSync(EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords, int offsetLagMax) throws InterruptedException {
        int expectedRecords = numRecords * topics.size();
        HashMap<String, String> consumerProps = new HashMap<String, String>();
        consumerProps.put("isolation.level", "read_committed");
        consumerProps.put("auto.offset.reset", "earliest");
        HashMap lastOffset = new HashMap();
        try (KafkaConsumer consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]));){
            AtomicInteger totalConsumedRecords = new AtomicInteger(0);
            org.apache.kafka.test.TestUtils.waitForCondition(() -> MirrorConnectorsIntegrationBaseTest.lambda$waitForConsumerGroupFullSync$18((Consumer)consumer, lastOffset, expectedRecords, totalConsumedRecords), (long)20000L, (String)"Consumer cannot consume all records in time");
        }
        var9_9 = null;
        try (Admin adminClient = connect.kafka().createAdminClient();){
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>(10 * topics.size());
            for (int partitionIndex = 0; partitionIndex < 10; ++partitionIndex) {
                for (String topic : topics) {
                    tps.add(new TopicPartition(topic, partitionIndex));
                }
            }
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                Map consumerGroupOffsets = (Map)adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
                Map endOffsetRequest = tps.stream().collect(Collectors.toMap(Function.identity(), ignored -> OffsetSpec.latest()));
                Map endOffsets = (Map)adminClient.listOffsets(endOffsetRequest).all().get();
                for (TopicPartition tp : tps) {
                    Assertions.assertTrue((boolean)consumerGroupOffsets.containsKey(tp), (String)("TopicPartition " + tp + " does not have translated offsets"));
                    long offset = ((OffsetAndMetadata)consumerGroupOffsets.get(tp)).offset();
                    Assertions.assertTrue((offset > (Long)lastOffset.get(tp) - (long)offsetLagMax ? 1 : 0) != 0, (String)("TopicPartition " + tp + " does not have fully-translated offsets: " + offset + " is not close enough to " + lastOffset.get(tp) + " (strictly more than " + ((Long)lastOffset.get(tp) - (long)offsetLagMax) + ")"));
                    Assertions.assertTrue((offset <= ((ListOffsetsResult.ListOffsetsResultInfo)endOffsets.get(tp)).offset() ? 1 : 0) != 0, (String)("TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics"));
                }
                return true;
            }, (long)30000L, (String)"Consumer group offset sync is not complete in time");
        }
        catch (Throwable throwable) {
            var9_9 = throwable;
            throw throwable;
        }
    }

    private static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
        TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
        try (KafkaConsumer backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("auto.offset.reset", "earliest"), new String[]{checkpointTopic});){
            HashMap<String, Map> checkpointsByGroup = new HashMap<String, Map>();
            long deadline = System.currentTimeMillis() + 20000L;
            do {
                ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L));
                for (ConsumerRecord record : records) {
                    Checkpoint checkpoint = Checkpoint.deserializeRecord((ConsumerRecord)record);
                    Map lastCheckpoints = checkpointsByGroup.computeIfAbsent(checkpoint.consumerGroupId(), ignored -> new HashMap());
                    Checkpoint lastCheckpoint = lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint);
                    Assertions.assertTrue((checkpoint.downstreamOffset() >= lastCheckpoint.downstreamOffset() ? 1 : 0) != 0, (String)("Checkpoint was non-monotonic for " + checkpoint.consumerGroupId() + ": " + checkpoint.topicPartition()));
                    lastCheckpoints.put(checkpoint.topicPartition(), checkpoint);
                }
            } while (backupConsumer.currentLag(checkpointTopicPartition).orElse(1L) > 0L && System.currentTimeMillis() < deadline);
            Assertions.assertEquals((long)0L, (long)backupConsumer.currentLag(checkpointTopicPartition).orElse(1L), (String)"Unable to read all checkpoints within 20000ms");
        }
    }

    private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
        ConsumerRecords records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
        for (TopicPartition tp : records.partitions()) {
            int count = records.records(tp).size();
            Assertions.assertTrue((count < offsetLagMax ? 1 : 0) != 0, (String)("downstream consumer is re-reading more than " + offsetLagMax + " records from" + tp));
        }
    }

    private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords) throws InterruptedException {
        AtomicInteger totalConsumedRecords = new AtomicInteger(0);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            return numExpectedRecords == totalConsumedRecords.addAndGet(records.count());
        }, (long)20000L, (String)"Consumer cannot consume all records in time");
        consumer.commitSync();
    }

    private static Map<String, String> basicMM2Config() {
        HashMap<String, String> mm2Props = new HashMap<String, String>();
        mm2Props.put("clusters", "primary, backup");
        mm2Props.put("max.tasks", "10");
        mm2Props.put("groups", "consumer-group-.*");
        mm2Props.put("sync.topic.acls.enabled", "false");
        mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(1));
        mm2Props.put("emit.heartbeats.interval.seconds", "1");
        mm2Props.put("refresh.topics.interval.seconds", "1");
        mm2Props.put("refresh.groups.interval.seconds", "1");
        mm2Props.put("checkpoints.topic.replication.factor", "1");
        mm2Props.put("heartbeats.topic.replication.factor", "1");
        mm2Props.put("offset-syncs.topic.replication.factor", "1");
        mm2Props.put("config.storage.replication.factor", "1");
        mm2Props.put("offset.storage.replication.factor", "1");
        mm2Props.put("status.storage.replication.factor", "1");
        mm2Props.put("replication.factor", "1");
        mm2Props.put("primary.offset.flush.interval.ms", "5000");
        mm2Props.put("backup.offset.flush.interval.ms", "5000");
        return mm2Props;
    }

    private void createTopics() {
        Map<String, String> topicConfig = Collections.singletonMap("cleanup.policy", "compact");
        Map emptyMap = Collections.emptyMap();
        Map<String, Integer> adminClientConfig = Collections.singletonMap("request.timeout.ms", 60000);
        this.primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, emptyMap, adminClientConfig);
        this.primary.kafka().createTopic("test-topic-1", 10, 1, topicConfig, adminClientConfig);
        this.backup.kafka().createTopic("test-topic-1", 10, 1, emptyMap, adminClientConfig);
        this.primary.kafka().createTopic("heartbeats", 1, 1, emptyMap, adminClientConfig);
        this.backup.kafka().createTopic("heartbeats", 1, 1, emptyMap, adminClientConfig);
        if (this.createReplicatedTopicsUpfront.booleanValue()) {
            this.primary.kafka().createTopic(this.remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
            this.backup.kafka().createTopic(this.remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
        }
    }

    protected void warmUpConsumer(Map<String, Object> consumerProps) {
        try (KafkaConsumer dummyConsumer = this.primary.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});){
            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            dummyConsumer.commitSync();
        }
        dummyConsumer = this.backup.kafka().createConsumerAndSubscribeTo(consumerProps, new String[]{"test-topic-1"});
        var3_3 = null;
        try {
            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
            dummyConsumer.commitSync();
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (dummyConsumer != null) {
                if (var3_3 != null) {
                    try {
                        dummyConsumer.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    dummyConsumer.close();
                }
            }
        }
    }

    private static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
        try (Admin adminClient = cluster.kafka().createAdminClient();){
            org.apache.kafka.test.TestUtils.waitForCondition(() -> !((Set)adminClient.listTopics().names().get(60000L, TimeUnit.MILLISECONDS)).contains(topicName), (long)60000L, (String)("Topic: " + topicName + " get created on cluster: " + cluster.getName()));
        }
    }

    protected static void waitForTopicPartitionCreated(EmbeddedConnectCluster cluster, String topicName, int totalNumPartitions) throws InterruptedException {
        try (Admin adminClient = cluster.kafka().createAdminClient();){
            org.apache.kafka.test.TestUtils.waitForCondition(() -> ((TopicDescription)((Map)adminClient.describeTopics(Collections.singleton(topicName)).allTopicNames().get()).get(topicName)).partitions().size() == totalNumPartitions, (long)60000L, (String)("Topic: " + topicName + "'s partitions didn't get created on cluster: " + cluster.getName()));
        }
    }

    private static /* synthetic */ boolean lambda$waitForConsumerGroupFullSync$18(Consumer consumer, Map lastOffset, int expectedRecords, AtomicInteger totalConsumedRecords) throws Exception {
        ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
        records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()));
        return expectedRecords == totalConsumedRecords.addAndGet(records.count());
    }

    private static /* synthetic */ boolean lambda$testOffsetSyncsTopicsOnTarget$2(Consumer backupConsumer, String remoteTopic) throws Exception {
        ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L));
        for (ConsumerRecord record : records) {
            Checkpoint checkpoint = Checkpoint.deserializeRecord((ConsumerRecord)record);
            if (!remoteTopic.equals(checkpoint.topicPartition().topic())) continue;
            return true;
        }
        return false;
    }
}

