/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.mirror.DefaultReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorSourceMetrics;
import org.apache.kafka.connect.mirror.MirrorSourceTask;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.OffsetSyncWriter;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class MirrorSourceTaskTest {
    @Test
    public void testSerde() {
        byte[] key = new byte[]{97, 98, 99, 100, 101};
        byte[] value = new byte[]{102, 103, 104, 105, 106, 107};
        RecordHeaders headers = new RecordHeaders();
        headers.add("header1", new byte[]{108, 109, 110, 111});
        headers.add("header2", new byte[]{112, 113, 114, 115, 116});
        ConsumerRecord consumerRecord = new ConsumerRecord("topic1", 2, 3L, 4L, TimestampType.CREATE_TIME, 5, 6, (Object)key, (Object)value, (Headers)headers, Optional.empty());
        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7", (ReplicationPolicy)new DefaultReplicationPolicy(), null);
        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
        Assertions.assertEquals((Object)"cluster7.topic1", (Object)sourceRecord.topic(), (String)"Failure on cluster7.topic1 consumerRecord serde");
        Assertions.assertEquals((int)2, (int)sourceRecord.kafkaPartition(), (String)"sourceRecord kafka partition is incorrect");
        Assertions.assertEquals((Object)new TopicPartition("topic1", 2), (Object)MirrorUtils.unwrapPartition((Map)sourceRecord.sourcePartition()), (String)"topic1 unwrapped from sourcePartition is incorrect");
        Assertions.assertEquals((long)3L, (long)MirrorUtils.unwrapOffset((Map)sourceRecord.sourceOffset()), (String)"sourceRecord's sourceOffset is incorrect");
        Assertions.assertEquals((long)4L, (long)sourceRecord.timestamp(), (String)"sourceRecord's timestamp is incorrect");
        Assertions.assertEquals((Object)key, (Object)sourceRecord.key(), (String)"sourceRecord's key is incorrect");
        Assertions.assertEquals((Object)value, (Object)sourceRecord.value(), (String)"sourceRecord's value is incorrect");
        Assertions.assertEquals((Object)headers.lastHeader("header1").value(), (Object)sourceRecord.headers().lastWithName("header1").value(), (String)"sourceRecord's header1 is incorrect");
        Assertions.assertEquals((Object)headers.lastHeader("header2").value(), (Object)sourceRecord.headers().lastWithName("header2").value(), (String)"sourceRecord's header2 is incorrect");
    }

    @Test
    public void testOffsetSync() {
        OffsetSyncWriter.PartitionState partitionState = new OffsetSyncWriter.PartitionState(50L);
        Assertions.assertTrue((boolean)partitionState.update(0L, 100L), (String)"always emit offset sync on first update");
        Assertions.assertTrue((boolean)partitionState.shouldSyncOffsets, (String)"should sync offsets");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.shouldSyncOffsets, (String)"should sync offsets to false");
        Assertions.assertTrue((boolean)partitionState.update(2L, 102L), (String)"upstream offset skipped -> resync");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(3L, 152L), (String)"no sync");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(4L, 153L), (String)"one past target offset");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(5L, 154L), (String)"no sync");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(6L, 203L), (String)"no sync");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(7L, 204L), (String)"one past target offset");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(2L, 206L), (String)"upstream reset");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(3L, 207L), (String)"no sync");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(4L, 3L), (String)"downstream reset");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(5L, 4L), (String)"no sync");
        Assertions.assertTrue((boolean)partitionState.update(7L, 6L), (String)"sync");
        Assertions.assertTrue((boolean)partitionState.update(7L, 6L), (String)"sync");
        Assertions.assertTrue((boolean)partitionState.update(8L, 7L), (String)"sync");
        Assertions.assertTrue((boolean)partitionState.update(10L, 57L), (String)"sync");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.update(11L, 58L), (String)"sync");
        Assertions.assertFalse((boolean)partitionState.shouldSyncOffsets, (String)"should sync offsets to false");
    }

    @Test
    public void testZeroOffsetSync() {
        OffsetSyncWriter.PartitionState partitionState = new OffsetSyncWriter.PartitionState(0L);
        Assertions.assertTrue((boolean)partitionState.update(0L, 100L), (String)"zeroOffsetSync downStreamOffset 100 is incorrect");
        Assertions.assertTrue((boolean)partitionState.shouldSyncOffsets, (String)"should sync offsets");
        partitionState.reset();
        Assertions.assertFalse((boolean)partitionState.shouldSyncOffsets, (String)"should sync offsets to false");
        Assertions.assertTrue((boolean)partitionState.update(2L, 102L), (String)"zeroOffsetSync downStreamOffset 102 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(3L, 153L), (String)"zeroOffsetSync downStreamOffset 153 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(4L, 154L), (String)"zeroOffsetSync downStreamOffset 154 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(5L, 155L), (String)"zeroOffsetSync downStreamOffset 155 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(6L, 207L), (String)"zeroOffsetSync downStreamOffset 207 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(2L, 208L), (String)"zeroOffsetSync downStreamOffset 208 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(3L, 209L), (String)"zeroOffsetSync downStreamOffset 209 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(4L, 3L), (String)"zeroOffsetSync downStreamOffset 3 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(5L, 4L), (String)"zeroOffsetSync downStreamOffset 4 is incorrect");
        Assertions.assertTrue((boolean)partitionState.update(7L, 6L), (String)"zeroOffsetSync downStreamOffset 6 is incorrect");
        Assertions.assertTrue((boolean)partitionState.update(7L, 6L), (String)"zeroOffsetSync downStreamOffset 6 is incorrect");
        Assertions.assertTrue((boolean)partitionState.update(8L, 7L), (String)"zeroOffsetSync downStreamOffset 7 is incorrect");
        Assertions.assertTrue((boolean)partitionState.update(10L, 57L), (String)"zeroOffsetSync downStreamOffset 57 is incorrect");
        partitionState.reset();
        Assertions.assertTrue((boolean)partitionState.update(11L, 58L), (String)"zeroOffsetSync downStreamOffset 58 is incorrect");
    }

    @Test
    public void testPoll() {
        byte[] key1 = "abc".getBytes();
        byte[] value1 = "fgh".getBytes();
        byte[] key2 = "123".getBytes();
        byte[] value2 = "456".getBytes();
        ArrayList<ConsumerRecord> consumerRecordsList = new ArrayList<ConsumerRecord>();
        String topicName = "test";
        String headerKey = "key";
        RecordHeaders headers = new RecordHeaders(new org.apache.kafka.common.header.Header[]{new RecordHeader(headerKey, "value".getBytes())});
        consumerRecordsList.add(new ConsumerRecord(topicName, 0, 0L, System.currentTimeMillis(), TimestampType.CREATE_TIME, key1.length, value1.length, (Object)key1, (Object)value1, (Headers)headers, Optional.empty()));
        consumerRecordsList.add(new ConsumerRecord(topicName, 1, 1L, System.currentTimeMillis(), TimestampType.CREATE_TIME, key2.length, value2.length, (Object)key2, (Object)value2, (Headers)headers, Optional.empty()));
        TopicPartition tp = new TopicPartition(topicName, 0);
        ConsumerRecords consumerRecords = new ConsumerRecords(Map.of(tp, consumerRecordsList), Map.of(tp, new OffsetAndMetadata(2L, Optional.empty(), "")));
        KafkaConsumer consumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        Mockito.when((Object)consumer.poll((Duration)ArgumentMatchers.any())).thenReturn((Object)consumerRecords);
        MirrorSourceMetrics metrics = (MirrorSourceMetrics)Mockito.mock(MirrorSourceMetrics.class);
        String sourceClusterName = "cluster1";
        DefaultReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, (ReplicationPolicy)replicationPolicy, null);
        List sourceRecords = mirrorSourceTask.poll();
        Assertions.assertEquals((int)2, (int)sourceRecords.size());
        for (int i = 0; i < sourceRecords.size(); ++i) {
            SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(i);
            ConsumerRecord consumerRecord = (ConsumerRecord)consumerRecordsList.get(i);
            Assertions.assertEquals((Object)consumerRecord.key(), (Object)sourceRecord.key(), (String)"consumerRecord key does not equal sourceRecord key");
            Assertions.assertEquals((Object)consumerRecord.value(), (Object)sourceRecord.value(), (String)"consumerRecord value does not equal sourceRecord value");
            Assertions.assertEquals((Object)replicationPolicy.formatRemoteTopic(sourceClusterName, topicName), (Object)sourceRecord.topic(), (String)"topicName not the same as the current replicationPolicy");
            Assertions.assertEquals((int)consumerRecord.partition(), (int)sourceRecord.kafkaPartition(), (String)"partition assignment not the same as the current replicationPolicy");
            ArrayList<org.apache.kafka.common.header.Header> expectedHeaders = new ArrayList<org.apache.kafka.common.header.Header>();
            consumerRecord.headers().forEach(expectedHeaders::add);
            ArrayList<Header> taskHeaders = new ArrayList<Header>();
            sourceRecord.headers().forEach(taskHeaders::add);
            this.compareHeaders(expectedHeaders, taskHeaders);
        }
    }

    @Test
    public void testSeekBehaviorDuringStart() {
        KafkaConsumer mockConsumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        SourceTaskContext mockSourceTaskContext = (SourceTaskContext)Mockito.mock(SourceTaskContext.class);
        OffsetStorageReader mockOffsetStorageReader = (OffsetStorageReader)Mockito.mock(OffsetStorageReader.class);
        Mockito.when((Object)mockSourceTaskContext.offsetStorageReader()).thenReturn((Object)mockOffsetStorageReader);
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>(Arrays.asList(new TopicPartition("previouslyReplicatedTopic", 8), new TopicPartition("previouslyReplicatedTopic1", 0), new TopicPartition("previouslyReplicatedTopic", 1), new TopicPartition("newTopicToReplicate1", 1), new TopicPartition("newTopicToReplicate1", 4), new TopicPartition("newTopicToReplicate2", 0)));
        long arbitraryCommittedOffset = 4L;
        long offsetToSeek = arbitraryCommittedOffset + 1L;
        Mockito.when((Object)mockOffsetStorageReader.offset(ArgumentMatchers.anyMap())).thenAnswer(testInvocation -> {
            Map topicPartitionOffsetMap = (Map)testInvocation.getArgument(0);
            String topicName = topicPartitionOffsetMap.get("topic").toString();
            if (topicName.startsWith("previouslyReplicatedTopic")) {
                topicPartitionOffsetMap.put("offset", arbitraryCommittedOffset);
            }
            return topicPartitionOffsetMap;
        });
        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, null, null, (ReplicationPolicy)new DefaultReplicationPolicy(), null);
        mirrorSourceTask.initialize(mockSourceTaskContext);
        mirrorSourceTask.initializeConsumer(topicPartitions);
        ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).assign(topicPartitions);
        ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).seek(new TopicPartition("previouslyReplicatedTopic", 8), offsetToSeek);
        ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).seek(new TopicPartition("previouslyReplicatedTopic", 1), offsetToSeek);
        ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).seek(new TopicPartition("previouslyReplicatedTopic1", 0), offsetToSeek);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockConsumer});
    }

    @Test
    public void testCommitRecordWithNullMetadata() {
        byte[] key1 = "abc".getBytes();
        byte[] value1 = "fgh".getBytes();
        String topicName = "test";
        String headerKey = "key";
        RecordHeaders headers = new RecordHeaders(new org.apache.kafka.common.header.Header[]{new RecordHeader(headerKey, "value".getBytes())});
        KafkaConsumer consumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        MirrorSourceMetrics metrics = (MirrorSourceMetrics)Mockito.mock(MirrorSourceMetrics.class);
        String sourceClusterName = "cluster1";
        DefaultReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, (ReplicationPolicy)replicationPolicy, null);
        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord(topicName, 0, 0L, System.currentTimeMillis(), TimestampType.CREATE_TIME, key1.length, value1.length, (Object)key1, (Object)value1, (Headers)headers, Optional.empty()));
        mirrorSourceTask.commitRecord(sourceRecord, null);
    }

    @Test
    public void testSendSyncEvent() {
        byte[] recordKey = "key".getBytes();
        byte[] recordValue = "value".getBytes();
        long maxOffsetLag = 50L;
        int recordPartition = 0;
        int recordOffset = 0;
        int metadataOffset = 100;
        String topicName = "topic";
        String sourceClusterName = "sourceCluster";
        RecordHeaders headers = new RecordHeaders();
        DefaultReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
        KafkaConsumer consumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        MirrorSourceMetrics metrics = (MirrorSourceMetrics)Mockito.mock(MirrorSourceMetrics.class);
        OffsetSyncWriter.PartitionState partitionState = new OffsetSyncWriter.PartitionState(maxOffsetLag);
        HashMap<TopicPartition, OffsetSyncWriter.PartitionState> partitionStates = new HashMap<TopicPartition, OffsetSyncWriter.PartitionState>();
        OffsetSyncWriter offsetSyncWriter = (OffsetSyncWriter)Mockito.mock(OffsetSyncWriter.class);
        Mockito.when((Object)offsetSyncWriter.maxOffsetLag()).thenReturn((Object)maxOffsetLag);
        ((OffsetSyncWriter)Mockito.doNothing().when((Object)offsetSyncWriter)).firePendingOffsetSyncs();
        ((OffsetSyncWriter)Mockito.doNothing().when((Object)offsetSyncWriter)).promoteDelayedOffsetSyncs();
        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, (ReplicationPolicy)replicationPolicy, offsetSyncWriter);
        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord(topicName, recordPartition, (long)recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, (Object)recordKey, (Object)recordValue, (Headers)headers, Optional.empty()));
        TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition((Map)sourceRecord.sourcePartition());
        partitionStates.put(sourceTopicPartition, partitionState);
        RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, (long)metadataOffset, 0, 0L, 0, recordPartition);
        ((OffsetSyncWriter)Mockito.doNothing().when((Object)offsetSyncWriter)).maybeQueueOffsetSyncs((TopicPartition)ArgumentMatchers.eq((Object)sourceTopicPartition), ArgumentMatchers.eq((long)recordOffset), ArgumentMatchers.eq((long)recordMetadata.offset()));
        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
        ((OffsetSyncWriter)Mockito.verify((Object)offsetSyncWriter, (VerificationMode)Mockito.times((int)1))).maybeQueueOffsetSyncs((TopicPartition)ArgumentMatchers.eq((Object)sourceTopicPartition), ArgumentMatchers.eq((long)recordOffset), ArgumentMatchers.eq((long)recordMetadata.offset()));
        ((OffsetSyncWriter)Mockito.verify((Object)offsetSyncWriter, (VerificationMode)Mockito.times((int)1))).firePendingOffsetSyncs();
        mirrorSourceTask.commit();
        ((OffsetSyncWriter)Mockito.verify((Object)offsetSyncWriter, (VerificationMode)Mockito.times((int)1))).promoteDelayedOffsetSyncs();
        ((OffsetSyncWriter)Mockito.verify((Object)offsetSyncWriter, (VerificationMode)Mockito.times((int)2))).firePendingOffsetSyncs();
    }

    private void compareHeaders(List<org.apache.kafka.common.header.Header> expectedHeaders, List<Header> taskHeaders) {
        Assertions.assertEquals((int)expectedHeaders.size(), (int)taskHeaders.size());
        for (int i = 0; i < expectedHeaders.size(); ++i) {
            org.apache.kafka.common.header.Header expectedHeader = expectedHeaders.get(i);
            Header taskHeader = taskHeaders.get(i);
            Assertions.assertEquals((Object)expectedHeader.key(), (Object)taskHeader.key(), (String)("taskHeader's key expected to equal " + taskHeader.key()));
            Assertions.assertEquals((Object)expectedHeader.value(), (Object)taskHeader.value(), (String)("taskHeader's value expected to equal " + taskHeader.value().toString()));
        }
    }
}

