package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.mirror.MirrorSourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceTaskTest.class */
public class MirrorSourceTaskTest {
    @Test
    public void testSerde() {
        byte[] bArr = {97, 98, 99, 100, 101};
        byte[] bArr2 = {102, 103, 104, 105, 106, 107};
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("header1", new byte[]{108, 109, 110, 111});
        recordHeaders.add("header2", new byte[]{112, 113, 114, 115, 116});
        SourceRecord convertRecord = new MirrorSourceTask((KafkaConsumer) null, (MirrorMetrics) null, "cluster7", new DefaultReplicationPolicy(), 50L).convertRecord(new ConsumerRecord("topic1", 2, 3L, 4L, TimestampType.CREATE_TIME, 5, 6, bArr, bArr2, recordHeaders, Optional.empty()));
        Assertions.assertEquals("cluster7.topic1", convertRecord.topic(), "Failure on cluster7.topic1 consumerRecord serde");
        Assertions.assertEquals(2, convertRecord.kafkaPartition().intValue(), "sourceRecord kafka partition is incorrect");
        Assertions.assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(convertRecord.sourcePartition()), "topic1 unwrapped from sourcePartition is incorrect");
        Assertions.assertEquals(3L, MirrorUtils.unwrapOffset(convertRecord.sourceOffset()).longValue(), "sourceRecord's sourceOffset is incorrect");
        Assertions.assertEquals(4L, convertRecord.timestamp().longValue(), "sourceRecord's timestamp is incorrect");
        Assertions.assertEquals(bArr, convertRecord.key(), "sourceRecord's key is incorrect");
        Assertions.assertEquals(bArr2, convertRecord.value(), "sourceRecord's value is incorrect");
        Assertions.assertEquals(recordHeaders.lastHeader("header1").value(), convertRecord.headers().lastWithName("header1").value(), "sourceRecord's header1 is incorrect");
        Assertions.assertEquals(recordHeaders.lastHeader("header2").value(), convertRecord.headers().lastWithName("header2").value(), "sourceRecord's header2 is incorrect");
    }

    @Test
    public void testOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50L);
        Assertions.assertTrue(partitionState.update(0L, 100L), "always emit offset sync on first update");
        Assertions.assertTrue(partitionState.update(2L, 102L), "upstream offset skipped -> resync");
        Assertions.assertFalse(partitionState.update(3L, 152L), "no sync");
        Assertions.assertFalse(partitionState.update(4L, 153L), "no sync");
        Assertions.assertFalse(partitionState.update(5L, 154L), "no sync");
        Assertions.assertTrue(partitionState.update(6L, 205L), "one past target offset");
        Assertions.assertTrue(partitionState.update(2L, 206L), "upstream reset");
        Assertions.assertFalse(partitionState.update(3L, 207L), "no sync");
        Assertions.assertTrue(partitionState.update(4L, 3L), "downstream reset");
        Assertions.assertFalse(partitionState.update(5L, 4L), "no sync");
    }

    @Test
    public void testZeroOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0L);
        Assertions.assertTrue(partitionState.update(0L, 100L), "zeroOffsetSync downStreamOffset 100 is incorrect");
        Assertions.assertTrue(partitionState.update(2L, 102L), "zeroOffsetSync downStreamOffset 102 is incorrect");
        Assertions.assertTrue(partitionState.update(3L, 153L), "zeroOffsetSync downStreamOffset 153 is incorrect");
        Assertions.assertTrue(partitionState.update(4L, 154L), "zeroOffsetSync downStreamOffset 154 is incorrect");
        Assertions.assertTrue(partitionState.update(5L, 155L), "zeroOffsetSync downStreamOffset 155 is incorrect");
        Assertions.assertTrue(partitionState.update(6L, 207L), "zeroOffsetSync downStreamOffset 207 is incorrect");
        Assertions.assertTrue(partitionState.update(2L, 208L), "zeroOffsetSync downStreamOffset 208 is incorrect");
        Assertions.assertTrue(partitionState.update(3L, 209L), "zeroOffsetSync downStreamOffset 209 is incorrect");
        Assertions.assertTrue(partitionState.update(4L, 3L), "zeroOffsetSync downStreamOffset 3 is incorrect");
        Assertions.assertTrue(partitionState.update(5L, 4L), "zeroOffsetSync downStreamOffset 4 is incorrect");
    }

    @Test
    public void testPoll() {
        byte[] bytes = "abc".getBytes();
        byte[] bytes2 = "fgh".getBytes();
        byte[] bytes3 = "123".getBytes();
        byte[] bytes4 = "456".getBytes();
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        arrayList.add(new ConsumerRecord("test", 0, 0L, System.currentTimeMillis(), TimestampType.CREATE_TIME, bytes.length, bytes2.length, bytes, bytes2, recordHeaders, Optional.empty()));
        arrayList.add(new ConsumerRecord("test", 1, 1L, System.currentTimeMillis(), TimestampType.CREATE_TIME, bytes3.length, bytes4.length, bytes3, bytes4, recordHeaders, Optional.empty()));
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(new TopicPartition("test", 0), arrayList));
        KafkaConsumer kafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        Mockito.when(kafkaConsumer.poll((Duration) ArgumentMatchers.any())).thenReturn(consumerRecords);
        MirrorMetrics mirrorMetrics = (MirrorMetrics) Mockito.mock(MirrorMetrics.class);
        DefaultReplicationPolicy defaultReplicationPolicy = new DefaultReplicationPolicy();
        List poll = new MirrorSourceTask(kafkaConsumer, mirrorMetrics, "cluster1", defaultReplicationPolicy, 50L).poll();
        Assertions.assertEquals(2, poll.size());
        for (int i = 0; i < poll.size(); i++) {
            SourceRecord sourceRecord = (SourceRecord) poll.get(i);
            ConsumerRecord consumerRecord = (ConsumerRecord) arrayList.get(i);
            Assertions.assertEquals(consumerRecord.key(), sourceRecord.key(), "consumerRecord key does not equal sourceRecord key");
            Assertions.assertEquals(consumerRecord.value(), sourceRecord.value(), "consumerRecord value does not equal sourceRecord value");
            Assertions.assertEquals(defaultReplicationPolicy.formatRemoteTopic("cluster1", "test"), sourceRecord.topic(), "topicName not the same as the current replicationPolicy");
            Assertions.assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue(), "partition assignment not the same as the current replicationPolicy");
            ArrayList arrayList2 = new ArrayList();
            Headers headers = consumerRecord.headers();
            arrayList2.getClass();
            headers.forEach((v1) -> {
                r1.add(v1);
            });
            ArrayList arrayList3 = new ArrayList();
            org.apache.kafka.connect.header.Headers headers2 = sourceRecord.headers();
            arrayList3.getClass();
            headers2.forEach((v1) -> {
                r1.add(v1);
            });
            compareHeaders(arrayList2, arrayList3);
        }
    }

    private void compareHeaders(List<Header> list, List<org.apache.kafka.connect.header.Header> list2) {
        Assertions.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            Header header = list.get(i);
            org.apache.kafka.connect.header.Header header2 = list2.get(i);
            Assertions.assertEquals(header.key(), header2.key(), "taskHeader's key expected to equal " + header2.key());
            Assertions.assertEquals(header.value(), header2.value(), "taskHeader's value expected to equal " + header2.value().toString());
        }
    }
}
