/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.CheckpointStore;
import org.apache.kafka.connect.mirror.DefaultReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorCheckpointTask;
import org.apache.kafka.connect.mirror.OffsetSyncStore;
import org.apache.kafka.connect.mirror.OffsetSyncStoreTest;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class MirrorCheckpointTaskTest {
    @Test
    public void testDownstreamTopicRenaming() {
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), null, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(Collections.emptyMap()));
        Assertions.assertEquals((Object)new TopicPartition("source1.topic3", 4), (Object)mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)), (String)"Renaming source1.topic3 failed");
        Assertions.assertEquals((Object)new TopicPartition("topic3", 5), (Object)mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)), (String)"Renaming target2.topic3 failed");
        Assertions.assertEquals((Object)new TopicPartition("source1.source6.topic7", 8), (Object)mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)), (String)"Renaming source1.source6.topic7 failed");
    }

    @Test
    public void testCheckpoint() {
        long t1UpstreamOffset = 3L;
        long t1DownstreamOffset = 4L;
        long t2UpstreamOffset = 7L;
        long t2DownstreamOffset = 8L;
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
        offsetSyncStore.start(true);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(Collections.emptyMap()));
        offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
        Optional optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2), new OffsetAndMetadata(10L, null));
        Assertions.assertTrue((boolean)optionalCheckpoint1.isPresent());
        Checkpoint checkpoint1 = (Checkpoint)optionalCheckpoint1.get();
        SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
        Assertions.assertEquals((Object)new TopicPartition("source1.topic1", 2), (Object)checkpoint1.topicPartition(), (String)"checkpoint group9 source1.topic1 failed");
        Assertions.assertEquals((Object)"group9", (Object)checkpoint1.consumerGroupId(), (String)"checkpoint group9 consumerGroupId failed");
        Assertions.assertEquals((Object)"group9", (Object)Checkpoint.unwrapGroup((Map)sourceRecord1.sourcePartition()), (String)"checkpoint group9 sourcePartition failed");
        Assertions.assertEquals((long)10L, (long)checkpoint1.upstreamOffset(), (String)"checkpoint group9 upstreamOffset failed");
        Assertions.assertEquals((long)(t1DownstreamOffset + 1L), (long)checkpoint1.downstreamOffset(), (String)"checkpoint group9 downstreamOffset failed");
        Assertions.assertEquals((long)123L, (long)sourceRecord1.timestamp(), (String)"checkpoint group9 timestamp failed");
        Optional optionalCheckpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6), new OffsetAndMetadata(12L, null));
        Assertions.assertTrue((boolean)optionalCheckpoint2.isPresent());
        Checkpoint checkpoint2 = (Checkpoint)optionalCheckpoint2.get();
        SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
        Assertions.assertEquals((Object)new TopicPartition("topic5", 6), (Object)checkpoint2.topicPartition(), (String)"checkpoint group11 topic5 failed");
        Assertions.assertEquals((Object)"group11", (Object)checkpoint2.consumerGroupId(), (String)"checkpoint group11 consumerGroupId failed");
        Assertions.assertEquals((Object)"group11", (Object)Checkpoint.unwrapGroup((Map)sourceRecord2.sourcePartition()), (String)"checkpoint group11 sourcePartition failed");
        Assertions.assertEquals((long)12L, (long)checkpoint2.upstreamOffset(), (String)"checkpoint group11 upstreamOffset failed");
        Assertions.assertEquals((long)(t2DownstreamOffset + 1L), (long)checkpoint2.downstreamOffset(), (String)"checkpoint group11 downstreamOffset failed");
        Assertions.assertEquals((long)234L, (long)sourceRecord2.timestamp(), (String)"checkpoint group11 timestamp failed");
        Optional optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6), new OffsetAndMetadata(7L, null));
        Assertions.assertTrue((boolean)optionalCheckpoint3.isPresent());
        Checkpoint checkpoint3 = (Checkpoint)optionalCheckpoint3.get();
        SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
        Assertions.assertEquals((Object)new TopicPartition("topic5", 6), (Object)checkpoint3.topicPartition(), (String)"checkpoint group13 topic5 failed");
        Assertions.assertEquals((Object)"group13", (Object)checkpoint3.consumerGroupId(), (String)"checkpoint group13 consumerGroupId failed");
        Assertions.assertEquals((Object)"group13", (Object)Checkpoint.unwrapGroup((Map)sourceRecord3.sourcePartition()), (String)"checkpoint group13 sourcePartition failed");
        Assertions.assertEquals((long)t2UpstreamOffset, (long)checkpoint3.upstreamOffset(), (String)"checkpoint group13 upstreamOffset failed");
        Assertions.assertEquals((long)t2DownstreamOffset, (long)checkpoint3.downstreamOffset(), (String)"checkpoint group13 downstreamOffset failed");
        Assertions.assertEquals((long)234L, (long)sourceRecord3.timestamp(), (String)"checkpoint group13 timestamp failed");
    }

    @Test
    public void testSyncOffset() {
        HashMap idleConsumerGroupsOffset = new HashMap();
        HashMap checkpointsPerConsumerGroup = new HashMap();
        String consumer1 = "consumer1";
        String consumer2 = "consumer2";
        String topic1 = "topic1";
        String topic2 = "topic2";
        HashMap<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition t1p0 = new TopicPartition(topic1, 0);
        c1t1.put(t1p0, new OffsetAndMetadata(100L));
        HashMap<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition t2p0 = new TopicPartition(topic2, 0);
        c2t2.put(t2p0, new OffsetAndMetadata(50L));
        idleConsumerGroupsOffset.put(consumer1, c1t1);
        idleConsumerGroupsOffset.put(consumer2, c2t2);
        Checkpoint cpC1T1P0 = new Checkpoint(consumer1, new TopicPartition(topic1, 0), 200L, 101L, "metadata");
        Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100L, 51L, "metadata");
        HashMap<TopicPartition, Checkpoint> checkpointMapC1 = new HashMap<TopicPartition, Checkpoint>();
        checkpointMapC1.put(cpC1T1P0.topicPartition(), cpC1T1P0);
        HashMap<TopicPartition, Checkpoint> checkpointMapC2 = new HashMap<TopicPartition, Checkpoint>();
        checkpointMapC2.put(cpC2T2P0.topicPartition(), cpC2T2P0);
        checkpointsPerConsumerGroup.put(consumer1, checkpointMapC1);
        checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), null, Collections.emptySet(), idleConsumerGroupsOffset, new CheckpointStore(checkpointsPerConsumerGroup));
        Map output = mirrorCheckpointTask.syncGroupOffset();
        Assertions.assertEquals((long)101L, (long)((OffsetAndMetadata)((Map)output.get(consumer1)).get(t1p0)).offset(), (String)("Consumer 1 " + topic1 + " failed"));
        Assertions.assertEquals((long)51L, (long)((OffsetAndMetadata)((Map)output.get(consumer2)).get(t2p0)).offset(), (String)("Consumer 2 " + topic2 + " failed"));
    }

    @Test
    public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() {
        HashMap idleConsumerGroupsOffset = new HashMap();
        HashMap checkpointsPerConsumerGroup = new HashMap();
        String consumer = "consumer";
        String topic = "topic";
        HashMap<TopicPartition, Object> ct = new HashMap<TopicPartition, Object>();
        TopicPartition tp = new TopicPartition(topic, 0);
        ct.put(tp, null);
        idleConsumerGroupsOffset.put(consumer, ct);
        Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200L, 101L, "metadata");
        HashMap<TopicPartition, Checkpoint> checkpointMap = new HashMap<TopicPartition, Checkpoint>();
        checkpointMap.put(cp.topicPartition(), cp);
        checkpointsPerConsumerGroup.put(consumer, checkpointMap);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target", (ReplicationPolicy)new DefaultReplicationPolicy(), null, Collections.emptySet(), idleConsumerGroupsOffset, new CheckpointStore(checkpointsPerConsumerGroup));
        Map output = mirrorCheckpointTask.syncGroupOffset();
        Assertions.assertEquals((long)101L, (long)((OffsetAndMetadata)((Map)output.get(consumer)).get(tp)).offset(), (String)("Consumer " + topic + " failed"));
    }

    @Test
    public void testNoCheckpointForTopicWithoutOffsetSyncs() {
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
        offsetSyncStore.start(true);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(Collections.emptyMap()));
        offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
        Optional checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 1), new OffsetAndMetadata(10L, null));
        Optional checkpoint2 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 0), new OffsetAndMetadata(10L, null));
        Assertions.assertFalse((boolean)checkpoint1.isPresent());
        Assertions.assertTrue((boolean)checkpoint2.isPresent());
    }

    @Test
    public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
        offsetSyncStore.start(true);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(Collections.emptyMap()));
        offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
        Optional checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
        Assertions.assertFalse((boolean)checkpoint.isPresent());
    }

    @Test
    public void testCheckpointRecordsMonotonicIfStoreRewinds() {
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
        offsetSyncStore.start(true);
        HashMap<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<String, Map<TopicPartition, Checkpoint>>();
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(checkpointsPerConsumerGroup));
        TopicPartition tp = new TopicPartition("topic1", 0);
        TopicPartition targetTP = new TopicPartition("source1.topic1", 0);
        long upstream = 11L;
        long downstream = 4L;
        offsetSyncStore.sync(tp, upstream++, downstream++);
        offsetSyncStore.sync(tp, upstream++, downstream++);
        long consumerGroupOffset = upstream;
        long expectedDownstreamOffset = downstream;
        Assertions.assertEquals((Object)OptionalLong.of(expectedDownstreamOffset), (Object)offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
        Map<TopicPartition, Checkpoint> checkpoints = this.assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, true);
        checkpointsPerConsumerGroup.put("g1", checkpoints);
        offsetSyncStore.sync(tp, upstream++, downstream++);
        offsetSyncStore.sync(tp, upstream++, downstream++);
        offsetSyncStore.sync(tp, upstream++, downstream++);
        offsetSyncStore.sync(tp, upstream++, downstream++);
        offsetSyncStore.sync(tp, upstream, downstream);
        Assertions.assertNotEquals((Object)OptionalLong.of(expectedDownstreamOffset), (Object)offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
        this.assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, false);
        this.assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset - 1L, true);
    }

    private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(MirrorCheckpointTask task, TopicPartition tp, TopicPartition remoteTp, long consumerGroupOffset, boolean truth) {
        Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = Collections.singletonMap(tp, new OffsetAndMetadata(consumerGroupOffset));
        Map checkpoints = task.checkpointsForGroup(consumerGroupOffsets, "g1");
        Assertions.assertEquals((Object)truth, (Object)checkpoints.containsKey(remoteTp), (String)("should" + (truth ? "" : " not") + " emit offset sync"));
        return checkpoints;
    }

    @Test
    public void testCheckpointsTaskRestartUsesExistingCheckpoints() {
        final TopicPartition t1p0 = new TopicPartition("t1", 0);
        TopicPartition sourceT1p0 = new TopicPartition("source1.t1", 0);
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(){

            @Override
            void backingStoreStart() {
                for (int i = 100; i <= 300; i += 100) {
                    this.sync(t1p0, i, i);
                }
            }
        };
        offsetSyncStore.start(false);
        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(Collections.emptyMap()));
        HashMap<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(250L));
        Map checkpoints = mirrorCheckpointTask.checkpointsForGroup(upstreamGroupOffsets, "group1");
        Assertions.assertEquals((int)1, (int)checkpoints.size());
        Assertions.assertEquals((Object)new Checkpoint("group1", sourceT1p0, 250L, 201L, ""), checkpoints.get(sourceT1p0));
        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore2 = new OffsetSyncStoreTest.FakeOffsetSyncStore(){

            @Override
            void backingStoreStart() {
                for (int i = 175; i <= 475; i += 100) {
                    this.sync(t1p0, i, i);
                }
            }
        };
        offsetSyncStore2.start(false);
        HashMap<String, Map> checkpointsPerConsumerGroup = new HashMap<String, Map>();
        checkpointsPerConsumerGroup.put("group1", checkpoints);
        MirrorCheckpointTask mirrorCheckpointTask2 = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), (OffsetSyncStore)offsetSyncStore2, Collections.emptySet(), Collections.emptyMap(), new CheckpointStore(checkpointsPerConsumerGroup));
        Assertions.assertEquals((Object)OptionalLong.of(176L), (Object)offsetSyncStore2.translateDownstream(null, t1p0, 250L));
        Assertions.assertEquals((Object)OptionalLong.of(176L), (Object)offsetSyncStore2.translateDownstream(null, t1p0, 370L));
        upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(250L));
        Assertions.assertTrue((boolean)mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1").isEmpty());
        upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(370L));
        Assertions.assertTrue((boolean)mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1").isEmpty());
        upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(400L));
        Map checkpoints2 = mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1");
        Assertions.assertEquals((int)1, (int)checkpoints2.size());
        Assertions.assertEquals((Object)new Checkpoint("group1", sourceT1p0, 400L, 376L, ""), checkpoints2.get(sourceT1p0));
    }

    @Test
    public void testCheckpointStoreInitialized() throws InterruptedException {
        CheckpointStore checkpointStore = (CheckpointStore)Mockito.mock(CheckpointStore.class);
        MirrorCheckpointTask task = new MirrorCheckpointTask("source1", "target2", (ReplicationPolicy)new DefaultReplicationPolicy(), new OffsetSyncStoreTest.FakeOffsetSyncStore(), Collections.singleton("group"), Collections.emptyMap(), checkpointStore){

            List<SourceRecord> sourceRecordsForGroup(String group) {
                SourceRecord sr = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "", Integer.valueOf(0), null, null);
                return Collections.singletonList(sr);
            }
        };
        Assertions.assertNull((Object)task.poll());
        Mockito.when((Object)checkpointStore.isInitialized()).thenReturn((Object)true);
        List polled = task.poll();
        Assertions.assertEquals((int)1, (int)polled.size());
    }
}

