package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.class */
public class MirrorConnectorsIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
    private static final int NUM_RECORDS_PER_PARTITION = 10;
    private static final int NUM_PARTITIONS = 10;
    private static final int NUM_RECORDS_PRODUCED = 100;
    private static final int RECORD_TRANSFER_DURATION_MS = 30000;
    private static final int CHECKPOINT_DURATION_MS = 20000;
    private static final int RECORD_CONSUME_DURATION_MS = 20000;
    private static final int OFFSET_SYNC_DURATION_MS = 30000;
    private volatile boolean shuttingDown;
    private Map<String, String> mm2Props;
    private MirrorMakerConfig mm2Config;
    private EmbeddedConnectCluster primary;
    private EmbeddedConnectCluster backup;
    private Exit.Procedure exitProcedure;
    private Exit.Procedure haltProcedure;

    @Before
    public void setup() throws InterruptedException {
        this.shuttingDown = false;
        this.exitProcedure = (i, str) -> {
            if (this.shuttingDown || i == 0) {
                return;
            }
            String str = "Abrupt service exit with code " + i + " and message " + str;
            log.warn(str);
            throw new UngracefulShutdownException(str);
        };
        this.haltProcedure = (i2, str2) -> {
            if (this.shuttingDown || i2 == 0) {
                return;
            }
            String str2 = "Abrupt service halt with code " + i2 + " and message " + str2;
            log.warn(str2);
            throw new UngracefulShutdownException(str2);
        };
        Exit.setExitProcedure(this.exitProcedure);
        Exit.setHaltProcedure(this.haltProcedure);
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        this.mm2Props = new HashMap();
        this.mm2Props.put("clusters", "primary, backup");
        this.mm2Props.put("max.tasks", "10");
        this.mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
        this.mm2Props.put("groups", "consumer-group-.*");
        this.mm2Props.put("primary->backup.enabled", "true");
        this.mm2Props.put("backup->primary.enabled", "true");
        this.mm2Props.put("sync.topic.acls.enabled", "false");
        this.mm2Props.put("emit.checkpoints.interval.seconds", "1");
        this.mm2Props.put("emit.heartbeats.interval.seconds", "1");
        this.mm2Props.put("refresh.topics.interval.seconds", "1");
        this.mm2Props.put("refresh.groups.interval.seconds", "1");
        this.mm2Props.put("checkpoints.topic.replication.factor", "1");
        this.mm2Props.put("heartbeats.topic.replication.factor", "1");
        this.mm2Props.put("offset-syncs.topic.replication.factor", "1");
        this.mm2Props.put("config.storage.replication.factor", "1");
        this.mm2Props.put("offset.storage.replication.factor", "1");
        this.mm2Props.put("status.storage.replication.factor", "1");
        this.mm2Props.put("replication.factor", "1");
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        Map workerConfig = this.mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
        Map workerConfig2 = this.mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
        this.primary = new EmbeddedConnectCluster.Builder().name("primary-connect-cluster").numWorkers(3).numBrokers(1).brokerProps(properties).workerProps(workerConfig).maskExitProcedures(false).build();
        this.backup = new EmbeddedConnectCluster.Builder().name("backup-connect-cluster").numWorkers(3).numBrokers(1).brokerProps(properties).workerProps(workerConfig2).maskExitProcedures(false).build();
        this.primary.start();
        this.primary.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of primary-connect-cluster did not start in time.");
        this.backup.start();
        this.backup.assertions().assertAtLeastNumWorkersAreUp(3, "Workers of backup-connect-cluster did not start in time.");
        this.primary.kafka().createTopic("test-topic-1", 10);
        this.primary.kafka().createTopic("backup.test-topic-1", 1);
        this.primary.kafka().createTopic("heartbeats", 1);
        this.backup.kafka().createTopic("test-topic-1", 10);
        this.backup.kafka().createTopic("primary.test-topic-1", 1);
        this.backup.kafka().createTopic("heartbeats", 1);
        produceMessages(this.primary, "test-topic-1", "message-1-");
        produceMessages(this.backup, "test-topic-1", "message-2-");
        Map singletonMap = Collections.singletonMap("group.id", "consumer-group-dummy");
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
        consumeAllMessages(createConsumerAndSubscribeTo);
        createConsumerAndSubscribeTo.close();
        KafkaConsumer createConsumerAndSubscribeTo2 = this.backup.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-1"});
        consumeAllMessages(createConsumerAndSubscribeTo2);
        createConsumerAndSubscribeTo2.close();
        log.info("primary REST service: {}", this.primary.endpointForResource("connectors"));
        log.info("backup REST service: {}", this.backup.endpointForResource("connectors"));
        log.info("primary brokers: {}", this.primary.kafka().bootstrapServers());
        log.info("backup brokers: {}", this.backup.kafka().bootstrapServers());
        this.mm2Props.put("primary.bootstrap.servers", this.primary.kafka().bootstrapServers());
        this.mm2Props.put("backup.bootstrap.servers", this.backup.kafka().bootstrapServers());
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
    }

    private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster embeddedConnectCluster, MirrorMakerConfig mirrorMakerConfig, String str, String str2) throws InterruptedException {
        embeddedConnectCluster.configureConnector("MirrorSourceConnector", mirrorMakerConfig.connectorBaseConfig(new SourceAndTarget(str, str2), MirrorSourceConnector.class));
        embeddedConnectCluster.configureConnector("MirrorCheckpointConnector", mirrorMakerConfig.connectorBaseConfig(new SourceAndTarget(str, str2), MirrorCheckpointConnector.class));
        embeddedConnectCluster.configureConnector("MirrorHeartbeatConnector", mirrorMakerConfig.connectorBaseConfig(new SourceAndTarget(str, str2), MirrorHeartbeatConnector.class));
        for (String str3 : new HashSet(Arrays.asList("MirrorSourceConnector", "MirrorCheckpointConnector", "MirrorHeartbeatConnector"))) {
            embeddedConnectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(str3, 1, "Connector " + str3 + " tasks did not start in time on cluster: " + embeddedConnectCluster);
        }
    }

    @After
    public void close() {
        try {
            Iterator it = this.primary.connectors().iterator();
            while (it.hasNext()) {
                this.primary.deleteConnector((String) it.next());
            }
            Iterator it2 = this.backup.connectors().iterator();
            while (it2.hasNext()) {
                this.backup.deleteConnector((String) it2.next());
            }
            deleteAllTopics(this.primary.kafka());
            deleteAllTopics(this.backup.kafka());
            this.shuttingDown = true;
            try {
                try {
                    this.primary.stop();
                    this.backup.stop();
                    Exit.resetExitProcedure();
                    Exit.resetHaltProcedure();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            this.shuttingDown = true;
            try {
                try {
                    this.primary.stop();
                    this.backup.stop();
                    Exit.resetExitProcedure();
                    Exit.resetHaltProcedure();
                    throw th;
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testReplication() throws InterruptedException {
        final String str = "consumer-group-testReplication";
        HashMap<String, Object> hashMap = new HashMap<String, Object>() { // from class: org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.1
            {
                put("group.id", str);
                put("auto.offset.reset", "latest");
            }
        };
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-1"});
        consumeAllMessages(createConsumerAndSubscribeTo, 0);
        createConsumerAndSubscribeTo.close();
        KafkaConsumer createConsumerAndSubscribeTo2 = this.backup.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-1"});
        consumeAllMessages(createConsumerAndSubscribeTo2, 0);
        createConsumerAndSubscribeTo2.close();
        waitUntilMirrorMakerIsRunning(this.backup, this.mm2Config, "primary", "backup");
        waitUntilMirrorMakerIsRunning(this.primary, this.mm2Config, "backup", "primary");
        MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig("primary"));
        MirrorClient mirrorClient2 = new MirrorClient(this.mm2Config.clientConfig("backup"));
        Assert.assertEquals("Records were not produced to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"test-topic-1"}).count());
        Assert.assertEquals("Records were not replicated to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"primary.test-topic-1"}).count());
        Assert.assertEquals("Records were not produced to backup cluster.", 100L, this.backup.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"test-topic-1"}).count());
        Assert.assertEquals("Records were not replicated to primary cluster.", 100L, this.primary.kafka().consume(NUM_RECORDS_PRODUCED, 30000L, new String[]{"backup.test-topic-1"}).count());
        Assert.assertEquals("Primary cluster doesn't have all records from both clusters.", 200L, this.primary.kafka().consume(200, 30000L, new String[]{"backup.test-topic-1", "test-topic-1"}).count());
        Assert.assertEquals("Backup cluster doesn't have all records from both clusters.", 200L, this.backup.kafka().consume(200, 30000L, new String[]{"primary.test-topic-1", "test-topic-1"}).count());
        Assert.assertTrue("Heartbeats were not emitted to primary cluster.", this.primary.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not emitted to backup cluster.", this.backup.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not replicated downstream to backup cluster.", this.backup.kafka().consume(1, 30000L, new String[]{"primary.heartbeats"}).count() > 0);
        Assert.assertTrue("Heartbeats were not replicated downstream to primary cluster.", this.primary.kafka().consume(1, 30000L, new String[]{"backup.heartbeats"}).count() > 0);
        Assert.assertTrue("Did not find upstream primary cluster.", mirrorClient2.upstreamClusters().contains("primary"));
        Assert.assertEquals("Did not calculate replication hops correctly.", 1L, mirrorClient2.replicationHops("primary"));
        Assert.assertTrue("Did not find upstream backup cluster.", mirrorClient.upstreamClusters().contains("backup"));
        Assert.assertEquals("Did not calculate replication hops correctly.", 1L, mirrorClient.replicationHops("backup"));
        Assert.assertTrue("Checkpoints were not emitted downstream to backup cluster.", this.backup.kafka().consume(1, 20000L, new String[]{"primary.checkpoints.internal"}).count() > 0);
        Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets = mirrorClient2.remoteConsumerOffsets("consumer-group-testReplication", "primary", Duration.ofMillis(20000L));
        Assert.assertTrue("Offsets not translated downstream to backup cluster. Found: " + remoteConsumerOffsets, remoteConsumerOffsets.containsKey(new TopicPartition("primary.test-topic-1", 0)));
        KafkaConsumer createConsumer = this.backup.kafka().createConsumer(hashMap);
        createConsumer.assign(allPartitions("test-topic-1", "primary.test-topic-1"));
        seek(createConsumer, remoteConsumerOffsets);
        consumeAllMessages(createConsumer, 0);
        Assert.assertTrue("Consumer failedover to zero offset.", createConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedover beyond expected offset.", createConsumer.position(new TopicPartition("primary.test-topic-1", 0)) <= 100);
        Assert.assertTrue("Checkpoints were not emitted upstream to primary cluster.", this.primary.kafka().consume(1, 20000L, new String[]{"backup.checkpoints.internal"}).count() > 0);
        createConsumer.close();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            try {
                return mirrorClient.remoteConsumerOffsets(str, "backup", Duration.ofMillis(20000L)).containsKey(new TopicPartition("backup.test-topic-1", 0));
            } catch (Throwable th) {
                return false;
            }
        }, 20000L, "Offsets not translated downstream to primary cluster.");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            try {
                return mirrorClient.remoteConsumerOffsets(str, "backup", Duration.ofMillis(20000L)).containsKey(new TopicPartition("test-topic-1", 0));
            } catch (Throwable th) {
                return false;
            }
        }, 20000L, "Offsets not translated upstream to primary cluster.");
        Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets2 = mirrorClient.remoteConsumerOffsets("consumer-group-testReplication", "backup", Duration.ofMillis(20000L));
        mirrorClient.close();
        mirrorClient2.close();
        KafkaConsumer createConsumer2 = this.primary.kafka().createConsumer(hashMap);
        createConsumer2.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
        seek(createConsumer2, remoteConsumerOffsets2);
        consumeAllMessages(createConsumer2, 0);
        Assert.assertTrue("Consumer failedback to zero upstream offset.", createConsumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedback to zero downstream offset.", createConsumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
        Assert.assertTrue("Consumer failedback beyond expected upstream offset.", createConsumer2.position(new TopicPartition("test-topic-1", 0)) <= 10);
        Assert.assertTrue("Consumer failedback beyond expected downstream offset.", createConsumer2.position(new TopicPartition("backup.test-topic-1", 0)) <= 10);
        Assert.assertEquals("Data was consumed from partitions: " + consumeAllMessages(createConsumer2, 0).keySet() + ".", 0L, r0.size());
        createConsumer2.close();
        this.primary.kafka().createTopic("test-topic-2", 10);
        this.backup.kafka().createTopic("test-topic-3", 10);
        produceMessages(this.primary, "test-topic-2", "message-3-", 1);
        produceMessages(this.backup, "test-topic-3", "message-4-", 1);
        Assert.assertEquals("Records were not produced to primary cluster.", 10L, this.primary.kafka().consume(10, 30000L, new String[]{"test-topic-2"}).count());
        Assert.assertEquals("Records were not produced to backup cluster.", 10L, this.backup.kafka().consume(10, 30000L, new String[]{"test-topic-3"}).count());
        Assert.assertEquals("New topic was not replicated to primary cluster.", 10L, this.primary.kafka().consume(10, 60000L, new String[]{"backup.test-topic-3"}).count());
        Assert.assertEquals("New topic was not replicated to backup cluster.", 10L, this.backup.kafka().consume(10, 60000L, new String[]{"primary.test-topic-2"}).count());
    }

    @Test
    public void testReplicationWithEmptyPartition() throws Exception {
        Map singletonMap = Collections.singletonMap("group.id", "consumer-group-testReplicationWithEmptyPartition");
        this.primary.kafka().createTopic("test-topic-with-empty-partition", 10);
        produceMessages(this.primary, "test-topic-with-empty-partition", "message-1-", 9);
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-with-empty-partition"});
        Throwable th = null;
        try {
            consumeAllMessages(createConsumerAndSubscribeTo, 90);
            if (createConsumerAndSubscribeTo != null) {
                if (0 != 0) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            waitUntilMirrorMakerIsRunning(this.backup, this.mm2Config, "primary", "backup");
            OffsetAndMetadata offsetAndMetadata = waitForConsumerGroupOffsetReplication(Collections.singletonList("primary.test-topic-with-empty-partition"), "consumer-group-testReplicationWithEmptyPartition", false).get(new TopicPartition("primary.test-topic-with-empty-partition", 9));
            Assert.assertNotNull("Offset of last partition was not replicated", offsetAndMetadata);
            Assert.assertEquals("Offset of last partition is not zero", 0L, offsetAndMetadata.offset());
        } catch (Throwable th3) {
            if (createConsumerAndSubscribeTo != null) {
                if (0 != 0) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th3;
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> waitForConsumerGroupOffsetReplication(List<String> list, String str, boolean z) throws InterruptedException {
        Admin createAdminClient = this.backup.kafka().createAdminClient();
        ArrayList arrayList = new ArrayList(10 * list.size());
        for (int i = 0; i < 10; i++) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(new TopicPartition(it.next(), i));
            }
        }
        long size = 10 * list.size();
        HashMap hashMap = new HashMap();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            hashMap.clear();
            Map<TopicPartition, OffsetAndMetadata> translatedOffsets = getTranslatedOffsets(str);
            Map map = (Map) createAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
            boolean z2 = true;
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                TopicPartition topicPartition = (TopicPartition) it2.next();
                OffsetAndMetadata offsetAndMetadata = translatedOffsets.get(topicPartition);
                OffsetAndMetadata offsetAndMetadata2 = (OffsetAndMetadata) map.get(topicPartition);
                if (offsetAndMetadata != null) {
                    hashMap.put(topicPartition, offsetAndMetadata);
                    if (z && offsetAndMetadata2.offset() != offsetAndMetadata.offset()) {
                        z2 = false;
                    }
                }
            }
            return ((long) hashMap.size()) == size && z2;
        }, 30000L, "Consumer group offset sync did not complete in time");
        return hashMap;
    }

    private Map<TopicPartition, OffsetAndMetadata> getTranslatedOffsets(String str) throws TimeoutException, InterruptedException {
        return RemoteClusterUtils.translateOffsets(this.mm2Config.clientConfig("backup").adminConfig(), "primary", str, Duration.ofMillis(20000L));
    }

    @Test
    public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
        Throwable th;
        final String str = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
        HashMap<String, Object> hashMap = new HashMap<String, Object>() { // from class: org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.2
            {
                put("group.id", str);
                put("auto.offset.reset", "earliest");
            }
        };
        this.primary.kafka().createTopic("test-topic-auto-offset-sync-1", 10);
        this.backup.kafka().createTopic("primary.test-topic-auto-offset-sync-1", 1);
        produceMessages(this.primary, "test-topic-auto-offset-sync-1", "message-1-");
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-auto-offset-sync-1"});
        Throwable th2 = null;
        try {
            try {
                consumeAllMessages(createConsumerAndSubscribeTo);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                this.mm2Props.put("sync.group.offsets.enabled", "true");
                this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
                this.mm2Props.put("backup->primary.enabled", "false");
                this.mm2Config = new MirrorMakerConfig(this.mm2Props);
                waitUntilMirrorMakerIsRunning(this.backup, this.mm2Config, "primary", "backup");
                waitForConsumerGroupOffsetReplication(Collections.singletonList("primary.test-topic-auto-offset-sync-1"), "consumer-group-testOneWayReplicationWithAutoOffsetSync", true);
                createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"primary.test-topic-auto-offset-sync-1"});
                th = null;
            } finally {
            }
            try {
                try {
                    ConsumerRecords poll = createConsumerAndSubscribeTo.poll(Duration.ofMillis(500L));
                    if (createConsumerAndSubscribeTo != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createConsumerAndSubscribeTo.close();
                        }
                    }
                    Assert.assertEquals("consumer record size is not zero", 0L, poll.count());
                    this.primary.kafka().createTopic("test-topic-auto-offset-sync-2", 10);
                    this.backup.kafka().createTopic("primary.test-topic-auto-offset-sync-2", 1);
                    produceMessages(this.primary, "test-topic-auto-offset-sync-2", "message-1-");
                    KafkaConsumer createConsumerAndSubscribeTo2 = this.primary.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-auto-offset-sync-2"});
                    Throwable th5 = null;
                    try {
                        consumeAllMessages(createConsumerAndSubscribeTo2);
                        if (createConsumerAndSubscribeTo2 != null) {
                            if (0 != 0) {
                                try {
                                    createConsumerAndSubscribeTo2.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            } else {
                                createConsumerAndSubscribeTo2.close();
                            }
                        }
                        waitForConsumerGroupOffsetReplication(Arrays.asList("primary.test-topic-auto-offset-sync-1", "primary.test-topic-auto-offset-sync-2"), "consumer-group-testOneWayReplicationWithAutoOffsetSync", true);
                        createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"primary.test-topic-auto-offset-sync-1", "primary.test-topic-auto-offset-sync-2"});
                        Throwable th7 = null;
                        try {
                            try {
                                ConsumerRecords poll2 = createConsumerAndSubscribeTo.poll(Duration.ofMillis(500L));
                                if (createConsumerAndSubscribeTo != null) {
                                    if (0 != 0) {
                                        try {
                                            createConsumerAndSubscribeTo.close();
                                        } catch (Throwable th8) {
                                            th7.addSuppressed(th8);
                                        }
                                    } else {
                                        createConsumerAndSubscribeTo.close();
                                    }
                                }
                                Assert.assertEquals("consumer record size is not zero", 0L, poll2.count());
                            } finally {
                            }
                        } finally {
                            if (createConsumerAndSubscribeTo != null) {
                                if (th7 != null) {
                                    try {
                                        createConsumerAndSubscribeTo.close();
                                    } catch (Throwable th9) {
                                        th7.addSuppressed(th9);
                                    }
                                } else {
                                    createConsumerAndSubscribeTo.close();
                                }
                            }
                        }
                    } catch (Throwable th10) {
                        if (createConsumerAndSubscribeTo2 != null) {
                            if (0 != 0) {
                                try {
                                    createConsumerAndSubscribeTo2.close();
                                } catch (Throwable th11) {
                                    th5.addSuppressed(th11);
                                }
                            } else {
                                createConsumerAndSubscribeTo2.close();
                            }
                        }
                        throw th10;
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private void deleteAllTopics(EmbeddedKafkaCluster embeddedKafkaCluster) {
        Admin createAdminClient = embeddedKafkaCluster.createAdminClient();
        try {
            createAdminClient.deleteTopics((Collection) createAdminClient.listTopics().names().get());
        } catch (Throwable th) {
        }
    }

    private void produceMessages(EmbeddedConnectCluster embeddedConnectCluster, String str, String str2) {
        produceMessages(embeddedConnectCluster, str, str2, 10);
    }

    private void produceMessages(EmbeddedConnectCluster embeddedConnectCluster, String str, String str2, int i) {
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            for (int i4 = 0; i4 < i; i4++) {
                int i5 = i2;
                i2++;
                embeddedConnectCluster.kafka().produce(str, Integer.valueOf(i4), "key", str2 + i5);
            }
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer) throws InterruptedException {
        return consumeAllMessages(consumer, null, null);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer, Integer num) throws InterruptedException {
        return consumeAllMessages(consumer, num, null);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer, Integer num, Duration duration) throws InterruptedException {
        int intValue = num != null ? num.intValue() : NUM_RECORDS_PRODUCED;
        int millis = (int) (duration != null ? duration.toMillis() : 20000L);
        ArrayList arrayList = new ArrayList();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            Iterator it = consumer.poll(Duration.ofMillis(1000L)).iterator();
            while (it.hasNext()) {
                arrayList.add((ConsumerRecord) it.next());
            }
            Assert.assertTrue("Consumer consumed more records than expected: " + arrayList.size() + " (expected " + intValue + ").", arrayList.size() <= intValue);
            return arrayList.size() == intValue;
        }, millis, "Consumer could not consume all records in time.");
        consumer.commitSync();
        return (Map) arrayList.stream().collect(Collectors.groupingBy(consumerRecord -> {
            return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        }));
    }

    private void seek(Consumer<byte[], byte[]> consumer, Map<TopicPartition, OffsetAndMetadata> map) throws InterruptedException {
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            boolean z = true;
            Map endOffsets = consumer.endOffsets(map.keySet());
            Iterator it = map.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TopicPartition topicPartition = (TopicPartition) it.next();
                if (((OffsetAndMetadata) map.get(topicPartition)).offset() > ((Long) endOffsets.get(topicPartition)).longValue()) {
                    z = false;
                    break;
                }
            }
            if (!z) {
                Thread.sleep(1000L);
            }
            return z;
        }, 30000L, "Records were not replicated in time.");
        consumer.getClass();
        map.forEach(consumer::seek);
    }

    private List<TopicPartition> allPartitions(String... strArr) {
        return (List) IntStream.range(0, 10).boxed().flatMap(num -> {
            return Arrays.stream(strArr).map(str -> {
                return new TopicPartition(str, num.intValue());
            });
        }).collect(Collectors.toList());
    }
}
