package org.apache.kafka.connect.mirror.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.class */
public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @BeforeEach
    public void startClusters() throws Exception {
        super.startClusters(new HashMap<String, String>() { // from class: org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.1
            {
                put("replication.policy.class", IdentityReplicationPolicy.class.getName());
                put("topics", "test-topic-.*");
                put("backup->primary.enabled", "false");
                put("primary->backup.enabled", "true");
            }
        });
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @Test
    public void testReplication() throws Exception {
        produceMessages(this.primary, "test-topic-1");
        final String str = "consumer-group-testReplication";
        warmUpConsumer(new HashMap<String, Object>() { // from class: org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.2
            {
                put("group.id", str);
                put("auto.offset.reset", "latest");
            }
        });
        this.mm2Config = new MirrorMakerConfig(this.mm2Props);
        waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
        waitUntilMirrorMakerIsRunning(this.primary, Collections.singletonList(MirrorHeartbeatConnector.class), this.mm2Config, "backup", "primary");
        MirrorClient mirrorClient = new MirrorClient(this.mm2Config.clientConfig("primary"));
        MirrorClient mirrorClient2 = new MirrorClient(this.mm2Config.clientConfig("backup"));
        waitForTopicCreated(this.primary, "test-topic-1");
        waitForTopicCreated(this.backup, "test-topic-1");
        Assertions.assertEquals("compact", getTopicConfig(this.backup.kafka(), "test-topic-1", "cleanup.policy"), "topic config was not synced");
        createAndTestNewTopicWithConfigFilter();
        Assertions.assertEquals(100, this.primary.kafka().consume(100, 30000L, new String[]{"test-topic-1"}).count(), "Records were not produced to primary cluster.");
        Assertions.assertEquals(100, this.backup.kafka().consume(100, 30000L, new String[]{"test-topic-1"}).count(), "Records were not replicated to backup cluster.");
        Assertions.assertTrue(this.primary.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0, "Heartbeats were not emitted to primary cluster.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0, "Heartbeats were not emitted to backup cluster.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 30000L, new String[]{"primary.heartbeats"}).count() > 0, "Heartbeats were not replicated downstream to backup cluster.");
        Assertions.assertTrue(this.primary.kafka().consume(1, 30000L, new String[]{"heartbeats"}).count() > 0, "Heartbeats were not replicated downstream to primary cluster.");
        Assertions.assertTrue(mirrorClient2.upstreamClusters().contains("primary"), "Did not find upstream primary cluster.");
        Assertions.assertEquals(1, mirrorClient2.replicationHops("primary"), "Did not calculate replication hops correctly.");
        Assertions.assertTrue(this.backup.kafka().consume(1, 20000L, new String[]{"primary.checkpoints.internal"}).count() > 0, "Checkpoints were not emitted downstream to backup cluster.");
        Map remoteConsumerOffsets = mirrorClient2.remoteConsumerOffsets("consumer-group-testReplication", "primary", Duration.ofMillis(20000L));
        Assertions.assertTrue(remoteConsumerOffsets.containsKey(new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + remoteConsumerOffsets);
        KafkaConsumer createConsumer = this.backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-testReplication"));
        Throwable th = null;
        try {
            try {
                createConsumer.assign(remoteConsumerOffsets.keySet());
                createConsumer.getClass();
                remoteConsumerOffsets.forEach(createConsumer::seek);
                createConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
                createConsumer.commitAsync();
                Assertions.assertTrue(createConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
                Assertions.assertTrue(createConsumer.position(new TopicPartition("test-topic-1", 0)) <= 100, "Consumer failedover beyond expected offset.");
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                mirrorClient.close();
                mirrorClient2.close();
                this.primary.kafka().createTopic("test-topic-2", 10);
                waitForTopicCreated(this.backup, "test-topic-2");
                produceMessages(this.primary, "test-topic-2", 1);
                Assertions.assertEquals(10, this.primary.kafka().consume(10, 30000L, new String[]{"test-topic-2"}).count(), "Records were not produced to primary cluster.");
                Assertions.assertEquals(10, this.backup.kafka().consume(10, 60000L, new String[]{"test-topic-2"}).count(), "New topic was not replicated to backup cluster.");
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @Test
    public void testReplicationWithEmptyPartition() throws Exception {
        Map singletonMap = Collections.singletonMap("group.id", "consumer-group-testReplicationWithEmptyPartition");
        this.primary.kafka().createTopic("test-topic-with-empty-partition", 10);
        produceMessages(this.primary, "test-topic-with-empty-partition", 9);
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-with-empty-partition"});
        Throwable th = null;
        try {
            try {
                waitForConsumingAllRecords(createConsumerAndSubscribeTo, 90);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                this.mm2Props.put("backup->primary.enabled", "false");
                this.mm2Config = new MirrorMakerConfig(this.mm2Props);
                waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
                Thread.sleep(TimeUnit.SECONDS.toMillis(3L));
                createConsumerAndSubscribeTo = this.backup.kafka().createConsumerAndSubscribeTo(singletonMap, new String[]{"test-topic-with-empty-partition"});
                Throwable th3 = null;
                try {
                    try {
                        waitForConsumingAllRecords(createConsumerAndSubscribeTo, 90);
                        if (createConsumerAndSubscribeTo != null) {
                            if (0 != 0) {
                                try {
                                    createConsumerAndSubscribeTo.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                createConsumerAndSubscribeTo.close();
                            }
                        }
                        Admin createAdminClient = this.backup.kafka().createAdminClient();
                        Throwable th5 = null;
                        try {
                            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) ((Map) createAdminClient.listConsumerGroupOffsets("consumer-group-testReplicationWithEmptyPartition").partitionsToOffsetAndMetadata().get()).get(new TopicPartition("test-topic-with-empty-partition", 9));
                            Assertions.assertNotNull(offsetAndMetadata, "Offset of last partition was not replicated");
                            Assertions.assertEquals(0L, offsetAndMetadata.offset(), "Offset of last partition is not zero");
                            if (createAdminClient != null) {
                                if (0 == 0) {
                                    createAdminClient.close();
                                    return;
                                }
                                try {
                                    createAdminClient.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            }
                        } catch (Throwable th7) {
                            if (createAdminClient != null) {
                                if (0 != 0) {
                                    try {
                                        createAdminClient.close();
                                    } catch (Throwable th8) {
                                        th5.addSuppressed(th8);
                                    }
                                } else {
                                    createAdminClient.close();
                                }
                            }
                            throw th7;
                        }
                    } catch (Throwable th9) {
                        th3 = th9;
                        throw th9;
                    }
                } finally {
                }
            } catch (Throwable th10) {
                th = th10;
                throw th10;
            }
        } finally {
        }
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @Test
    public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
        produceMessages(this.primary, "test-topic-1");
        final String str = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
        HashMap<String, Object> hashMap = new HashMap<String, Object>() { // from class: org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.3
            {
                put("group.id", str);
                put("auto.offset.reset", "earliest");
            }
        };
        KafkaConsumer createConsumerAndSubscribeTo = this.primary.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-1"});
        Throwable th = null;
        try {
            try {
                waitForConsumingAllRecords(createConsumerAndSubscribeTo, 100);
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                this.mm2Props.put("sync.group.offsets.enabled", "true");
                this.mm2Props.put("sync.group.offsets.interval.seconds", "1");
                this.mm2Props.put("backup->primary.enabled", "false");
                this.mm2Config = new MirrorMakerConfig(this.mm2Props);
                waitUntilMirrorMakerIsRunning(this.backup, CONNECTOR_LIST, this.mm2Config, "primary", "backup");
                waitForTopicCreated(this.primary, "backup.test-topic-1");
                waitForTopicCreated(this.backup, "test-topic-1");
                KafkaConsumer createConsumerAndSubscribeTo2 = this.backup.kafka().createConsumerAndSubscribeTo(hashMap, new String[]{"test-topic-1"});
                waitForConsumerGroupOffsetSync(this.backup, createConsumerAndSubscribeTo2, Collections.singletonList("test-topic-1"), "consumer-group-testOneWayReplicationWithAutoOffsetSync", 100);
                Assertions.assertEquals(0, createConsumerAndSubscribeTo2.poll(CONSUMER_POLL_TIMEOUT_MS).count(), "consumer record size is not zero");
                this.primary.kafka().createTopic("test-topic-2", 10);
                waitForTopicCreated(this.backup, "test-topic-2");
                produceMessages(this.primary, "test-topic-2");
                KafkaConsumer createConsumerAndSubscribeTo3 = this.primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-1"), new String[]{"test-topic-2"});
                Throwable th3 = null;
                try {
                    waitForConsumingAllRecords(createConsumerAndSubscribeTo3, 100);
                    if (createConsumerAndSubscribeTo3 != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo3.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            createConsumerAndSubscribeTo3.close();
                        }
                    }
                    KafkaConsumer createConsumerAndSubscribeTo4 = this.backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap("group.id", "consumer-group-testOneWayReplicationWithAutoOffsetSync"), new String[]{"test-topic-1", "test-topic-2"});
                    waitForConsumerGroupOffsetSync(this.backup, createConsumerAndSubscribeTo4, Arrays.asList("test-topic-1", "test-topic-2"), "consumer-group-testOneWayReplicationWithAutoOffsetSync", 100);
                    Assertions.assertEquals(0, createConsumerAndSubscribeTo4.poll(CONSUMER_POLL_TIMEOUT_MS).count(), "consumer record size is not zero");
                    createConsumerAndSubscribeTo4.close();
                } catch (Throwable th5) {
                    if (createConsumerAndSubscribeTo3 != null) {
                        if (0 != 0) {
                            try {
                                createConsumerAndSubscribeTo3.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            createConsumerAndSubscribeTo3.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            if (createConsumerAndSubscribeTo != null) {
                if (th != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th7;
        }
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    String backupClusterTopicName(String str) {
        return str;
    }
}
