package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.mirror.MirrorSourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceTaskTest.class */
public class MirrorSourceTaskTest {
    @Test
    public void testSerde() {
        byte[] bArr = {97, 98, 99, 100, 101};
        byte[] bArr2 = {102, 103, 104, 105, 106, 107};
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("header1", new byte[]{108, 109, 110, 111});
        recordHeaders.add("header2", new byte[]{112, 113, 114, 115, 116});
        SourceRecord convertRecord = new MirrorSourceTask("cluster7", new DefaultReplicationPolicy(), 50L).convertRecord(new ConsumerRecord("topic1", 2, 3L, 4L, TimestampType.CREATE_TIME, 0L, 5, 6, bArr, bArr2, recordHeaders));
        Assert.assertEquals("cluster7.topic1", convertRecord.topic());
        Assert.assertEquals(2L, convertRecord.kafkaPartition().intValue());
        Assert.assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(convertRecord.sourcePartition()));
        Assert.assertEquals(3L, MirrorUtils.unwrapOffset(convertRecord.sourceOffset()).longValue());
        Assert.assertEquals(4L, convertRecord.timestamp().longValue());
        Assert.assertEquals(bArr, convertRecord.key());
        Assert.assertEquals(bArr2, convertRecord.value());
        Assert.assertEquals(recordHeaders.lastHeader("header1").value(), convertRecord.headers().lastWithName("header1").value());
        Assert.assertEquals(recordHeaders.lastHeader("header2").value(), convertRecord.headers().lastWithName("header2").value());
    }

    @Test
    public void testOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50L);
        Assert.assertTrue("always emit offset sync on first update", partitionState.update(0L, 100L));
        Assert.assertTrue("upstream offset skipped -> resync", partitionState.update(2L, 102L));
        Assert.assertFalse("no sync", partitionState.update(3L, 152L));
        Assert.assertFalse("no sync", partitionState.update(4L, 153L));
        Assert.assertFalse("no sync", partitionState.update(5L, 154L));
        Assert.assertTrue("one past target offset", partitionState.update(6L, 205L));
        Assert.assertTrue("upstream reset", partitionState.update(2L, 206L));
        Assert.assertFalse("no sync", partitionState.update(3L, 207L));
        Assert.assertTrue("downstream reset", partitionState.update(4L, 3L));
        Assert.assertFalse("no sync", partitionState.update(5L, 4L));
    }

    @Test
    public void testZeroOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0L);
        Assert.assertTrue(partitionState.update(0L, 100L));
        Assert.assertTrue(partitionState.update(2L, 102L));
        Assert.assertTrue(partitionState.update(3L, 153L));
        Assert.assertTrue(partitionState.update(4L, 154L));
        Assert.assertTrue(partitionState.update(5L, 155L));
        Assert.assertTrue(partitionState.update(6L, 207L));
        Assert.assertTrue(partitionState.update(2L, 208L));
        Assert.assertTrue(partitionState.update(3L, 209L));
        Assert.assertTrue(partitionState.update(4L, 3L));
        Assert.assertTrue(partitionState.update(5L, 4L));
    }
}
