package org.apache.kafka.connect.mirror.integration;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Disabled("https://confluentinc.atlassian.net/browse/CC-21505")
@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.class */
public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectorsIntegrationBaseTest {
    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @BeforeEach
    public void startClusters() throws Exception {
        this.mm2Props.put("primary.exactly.once.source.support", DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString());
        this.mm2Props.put("backup.exactly.once.source.support", DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString());
        for (Properties properties : Arrays.asList(this.primaryBrokerProps, this.backupBrokerProps)) {
            properties.put("transaction.state.log.replication.factor", "1");
            properties.put("transaction.state.log.min.isr", "1");
        }
        super.startClusters();
    }

    @Override // org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
    @Test
    public void testReplication() throws Exception {
        super.testReplication();
        String remoteTopicName = remoteTopicName("test-topic-1", "primary");
        String remoteTopicName2 = remoteTopicName("test-topic-2", "primary");
        stopMirrorMakerConnectors(this.backup, MirrorSourceConnector.class);
        alterMirrorMakerSourceConnectorOffsets(this.backup, j -> {
            return 0L;
        }, "test-topic-1");
        resetSomeMirrorMakerSourceConnectorOffsets(this.backup, "test-topic-2");
        resumeMirrorMakerConnectors(this.backup, MirrorSourceConnector.class);
        Assertions.assertEquals(190, this.backup.kafka().consume(190, 30000L, new String[]{remoteTopicName}).count(), "Records were not re-replicated to backup cluster after altering offsets.");
        Assertions.assertEquals(20, this.backup.kafka().consume(20, 30000L, new String[]{remoteTopicName2}).count(), "New topic was not re-replicated to backup cluster after altering offsets.");
        Class[] clsArr = (Class[]) CONNECTOR_LIST.toArray(new Class[0]);
        stopMirrorMakerConnectors(this.backup, clsArr);
        resetAllMirrorMakerConnectorOffsets(this.backup, clsArr);
        resumeMirrorMakerConnectors(this.backup, clsArr);
        int i = 190 + 100;
        Assertions.assertEquals(i, this.backup.kafka().consume(i, 30000L, new String[]{remoteTopicName}).count(), "Records were not re-replicated to backup cluster after resetting offsets.");
        int i2 = 20 + 10;
        Assertions.assertEquals(i2, this.backup.kafka().consume(i2, 30000L, new String[]{remoteTopicName2}).count(), "New topic was not re-replicated to backup cluster after resetting offsets.");
    }
}
