package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.runner.RunWith;

@RunWith(Theories.class)
/* loaded from: input_file:io/confluent/connect/replicator/offsets/ConsumerOffsetsTranslatorTest.class */
public class ConsumerOffsetsTranslatorTest extends EasyMockSupport {
    private MockTime time;
    private static final TimestampType DEFAULT_TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
    private static final int DEFAULT_BATCH_PERIOD_MS = 0;
    private static final int DEFAULT_BATCH_SIZE_MS = 0;

    @Mock
    private Consumer<byte[], byte[]> destConsumer;
    private final String taskId = "replicator-1";
    private final Set<String> whitelistTopics = Collections.singleton("foo");
    private final Pattern topicPattern = Pattern.compile("other");
    private final Set<String> blacklistTopics = Collections.singleton("bad");
    private final int topicCreateBackoffMs = 10000;
    private final int topicConfigSyncIntervalMs = 120000;
    private final String sourceGroup1 = "mygroup1";
    private final String sourceGroup2 = "mygroup2";
    private final String sourceTopic = "foo";
    private final String ignoreTopic = "ignore";
    private final String badTopic = "bad";
    private final String topicRenameFormat = "dc.${topic}";

    @DataPoint
    public static TimestampType[] typeParams() {
        return new TimestampType[]{TimestampType.NO_TIMESTAMP_TYPE, TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME};
    }

    @DataPoint
    public static TimestampType[] userDefinedTypeParams() {
        return new TimestampType[]{TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME, null};
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.destConsumer = (Consumer) createMock(Consumer.class);
    }

    @Test
    public void testConsumerRecord() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L))))), 0, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordsDifferentTimestamps() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        GroupTopicPartition groupTopicPartition = new GroupTopicPartition("mygroup1", topicPartition);
        GroupTopicPartition groupTopicPartition2 = new GroupTopicPartition("mygroup2", topicPartition);
        byte[] serialize = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition);
        byte[] serialize2 = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition2);
        byte[] serialize3 = new LongSerializer().serialize("foo", 0L);
        byte[] serialize4 = new LongSerializer().serialize("foo", 1L);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize, serialize3));
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize2, serialize4));
        hashMap.put(topicPartition, arrayList);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(hashMap), 0, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordsUsingCache() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        GroupTopicPartition groupTopicPartition = new GroupTopicPartition("mygroup1", topicPartition);
        GroupTopicPartition groupTopicPartition2 = new GroupTopicPartition("mygroup2", topicPartition);
        byte[] serialize = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition);
        byte[] serialize2 = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition2);
        byte[] serialize3 = new LongSerializer().serialize("foo", 0L);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize, serialize3));
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize2, serialize3));
        hashMap.put(topicPartition, arrayList);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(hashMap), 0, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissing() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L))))), 1, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingTwice() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L))))), 2, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingWithBackoff() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L))))), 3, 60000L, 150L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingWithExpiry() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L))))), 3, 300L, 0L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordWithBadTopic() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bad", 0);
        GroupTopicPartition groupTopicPartition = new GroupTopicPartition("mygroup1", topicPartition);
        GroupTopicPartition groupTopicPartition2 = new GroupTopicPartition("bad", topicPartition2);
        byte[] serialize = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition);
        byte[] serialize2 = new GroupTopicPartitionSerializer().serialize("bad", groupTopicPartition2);
        byte[] serialize3 = new LongSerializer().serialize("foo", 0L);
        byte[] serialize4 = new LongSerializer().serialize("bad", 0L);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize, serialize3));
        arrayList.add(new ConsumerRecord("bad", 0, 0L, serialize2, serialize4));
        hashMap.put(topicPartition, arrayList);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(hashMap), 0, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordWithIgnoredTopic() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("ignore", 0);
        GroupTopicPartition groupTopicPartition = new GroupTopicPartition("mygroup1", topicPartition);
        GroupTopicPartition groupTopicPartition2 = new GroupTopicPartition("ignore", topicPartition2);
        byte[] serialize = new GroupTopicPartitionSerializer().serialize("foo", groupTopicPartition);
        byte[] serialize2 = new GroupTopicPartitionSerializer().serialize("ignore", groupTopicPartition2);
        byte[] serialize3 = new LongSerializer().serialize("foo", 0L);
        byte[] serialize4 = new LongSerializer().serialize("ignore", 0L);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("foo", 0, 0L, serialize, serialize3));
        arrayList.add(new ConsumerRecord("ignore", 0, 0L, serialize2, serialize4));
        hashMap.put(topicPartition, arrayList);
        expectSimpleReceive("foo", topicPartition, "dc.${topic}", new ConsumerRecords<>(hashMap), 0, 60000L, 100L);
        verifyAll();
    }

    @Test
    public void testConsumerRecordWithFetchOffetsException() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L)))));
        String renameTopic = Utils.renameTopic("dc.${topic}", "foo");
        ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig = setupTaskConfigMock(60000L, 100L, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(renameTopic, 0), 0L);
        EasyMock.expect(this.destConsumer.listTopics()).andReturn(Collections.singletonMap(renameTopic, Collections.singletonList(new PartitionInfo(renameTopic, 0, (Node) null, (Node[]) null, (Node[]) null))));
        new HashMap().put(new TopicPartition(renameTopic, 0), new OffsetAndTimestamp(0L, 0L));
        EasyMock.expect(this.destConsumer.offsetsForTimes(hashMap)).andThrow(new KafkaException());
        this.destConsumer.close();
        EasyMock.expectLastCall();
        replayAll();
        new ConsumerOffsetsTranslator(consumerOffsetsTranslatorConfig, "replicator-1", this.time, 0, 0) { // from class: io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorTest.1
            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig2, String str) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        }.translateOffsets(consumerRecords.records(topicPartition));
        verifyAll();
        Assert.assertEquals(1L, r0.timestampsForRetry().size());
    }

    @Test
    public void testConsumerRecordWithTopicPartitionMissing() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L)))));
        String renameTopic = Utils.renameTopic("dc.${topic}", "foo");
        ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig = setupTaskConfigMock(60000L, 100L, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        new HashMap().put(new TopicPartition(renameTopic, 0), 0L);
        EasyMock.expect(this.destConsumer.listTopics()).andReturn(Collections.emptyMap());
        this.destConsumer.close();
        EasyMock.expectLastCall();
        replayAll();
        new ConsumerOffsetsTranslator(consumerOffsetsTranslatorConfig, "replicator-1", this.time, 0, 0) { // from class: io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorTest.2
            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig2, String str) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        }.translateOffsets(consumerRecords.records(topicPartition));
        verifyAll();
        Assert.assertEquals(1L, r0.timestampsForRetry().size());
    }

    @Test
    public void testConsumerRecordWithCommitOffsetsException() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.singletonMap(topicPartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, new GroupTopicPartitionSerializer().serialize("foo", new GroupTopicPartition("mygroup1", topicPartition)), new LongSerializer().serialize("foo", 0L)))));
        String renameTopic = Utils.renameTopic("dc.${topic}", "foo");
        ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig = setupTaskConfigMock(60000L, 100L, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(renameTopic, 0), 0L);
        EasyMock.expect(this.destConsumer.listTopics()).andReturn(Collections.singletonMap(renameTopic, Collections.singletonList(new PartitionInfo(renameTopic, 0, (Node) null, (Node[]) null, (Node[]) null))));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new TopicPartition(renameTopic, 0), new OffsetAndTimestamp(0L, 0L));
        EasyMock.expect(this.destConsumer.offsetsForTimes(hashMap)).andReturn(hashMap2);
        for (int i = 0; i < consumerRecords.count(); i++) {
            HashMap hashMap3 = new HashMap();
            hashMap3.put(new TopicPartition(renameTopic, 0), new OffsetAndMetadata(1L));
            this.destConsumer.commitSync(hashMap3);
            EasyMock.expectLastCall().andThrow(new KafkaException());
        }
        this.destConsumer.close();
        EasyMock.expectLastCall();
        replayAll();
        new ConsumerOffsetsTranslator(consumerOffsetsTranslatorConfig, "replicator-1", this.time, 0, 0) { // from class: io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorTest.3
            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig2, String str) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        }.translateOffsets(consumerRecords.records(topicPartition));
        verifyAll();
        Assert.assertEquals(0L, r0.timestampsForRetry().size());
    }

    private void expectSimpleReceive(String str, TopicPartition topicPartition, String str2, ConsumerRecords<byte[], byte[]> consumerRecords, int i, long j, long j2) throws InterruptedException, ExecutionException {
        int i2;
        String renameTopic = Utils.renameTopic(str2, str);
        ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig = setupTaskConfigMock(j, j2, this.whitelistTopics, this.topicPattern, this.blacklistTopics, str2, true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        long j3 = 0;
        long j4 = 0;
        int i3 = 0;
        while (true) {
            if (j3 >= j) {
                this.destConsumer.close();
                EasyMock.expectLastCall();
                break;
            }
            if (j3 <= 0 || j4 + j2 <= j3) {
                HashMap hashMap = new HashMap();
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (consumerRecord.topic().equals(str)) {
                        TopicPartition topicPartition2 = new TopicPartition(renameTopic, 0);
                        HashMap hashMap2 = new HashMap();
                        Long deserialize = new LongDeserializer().deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value());
                        long longValue = 1000 + deserialize.longValue();
                        Set set = (Set) hashMap.computeIfAbsent(topicPartition2, topicPartition3 -> {
                            return new HashSet();
                        });
                        if (!set.contains(deserialize)) {
                            HashMap hashMap3 = new HashMap();
                            if (i3 == i) {
                                hashMap3.put(topicPartition2, new OffsetAndTimestamp(longValue, deserialize.longValue()));
                            } else {
                                hashMap3.put(topicPartition2, null);
                            }
                            hashMap2.put(topicPartition2, deserialize);
                            EasyMock.expect(this.destConsumer.listTopics()).andReturn(Collections.singletonMap(renameTopic, Collections.singletonList(new PartitionInfo(topicPartition2.topic(), topicPartition2.partition(), (Node) null, (Node[]) null, (Node[]) null))));
                            EasyMock.expect(this.destConsumer.offsetsForTimes(hashMap2)).andReturn(hashMap3);
                            set.add(deserialize);
                        }
                        if (i3 == i) {
                            HashMap hashMap4 = new HashMap();
                            hashMap4.put(topicPartition2, new OffsetAndMetadata(longValue + 1));
                            this.destConsumer.commitSync(hashMap4);
                            EasyMock.expectLastCall();
                        }
                        this.destConsumer.close();
                        EasyMock.expectLastCall();
                    }
                }
                j4 = j3;
                j3 += 100;
            } else {
                j3 += 100;
                this.destConsumer.close();
                EasyMock.expectLastCall();
            }
            int i4 = i3;
            i3++;
            if (i4 >= i) {
                break;
            }
        }
        replayAll();
        ConsumerOffsetsTranslator consumerOffsetsTranslator = new ConsumerOffsetsTranslator(consumerOffsetsTranslatorConfig, "replicator-1", this.time, 0, 0) { // from class: io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorTest.4
            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig2, String str3) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        };
        int i5 = 0;
        do {
            if (i5 == 0) {
                consumerOffsetsTranslator.translateOffsets(consumerRecords.records(topicPartition));
            } else {
                consumerOffsetsTranslator.translateOffsets(Collections.emptyList());
            }
            this.time.sleep(100L);
            i2 = i5;
            i5++;
        } while (i2 < i);
    }

    private ConsumerOffsetsTranslatorConfig setupTaskConfigMock(long j, long j2, Set<String> set, Pattern pattern, Set<String> set2, String str, boolean z, boolean z2, int i, boolean z3, int i2, TimestampType timestampType) {
        ConsumerOffsetsTranslatorConfig consumerOffsetsTranslatorConfig = (ConsumerOffsetsTranslatorConfig) createMock(ConsumerOffsetsTranslatorConfig.class);
        EasyMock.expect(Long.valueOf(consumerOffsetsTranslatorConfig.getFetchOffsetExpiryMs())).andStubReturn(Long.valueOf(j));
        EasyMock.expect(Long.valueOf(consumerOffsetsTranslatorConfig.getFetchOffsetRetryBackoffMs())).andStubReturn(Long.valueOf(j2));
        EasyMock.expect(consumerOffsetsTranslatorConfig.getTopics()).andStubReturn(set);
        EasyMock.expect(consumerOffsetsTranslatorConfig.getTopicPattern()).andStubReturn(pattern);
        EasyMock.expect(consumerOffsetsTranslatorConfig.getBlacklistTopics()).andStubReturn(set2);
        EasyMock.expect(consumerOffsetsTranslatorConfig.getTopicRenameFormat()).andStubReturn(str);
        EasyMock.expect(Boolean.valueOf(consumerOffsetsTranslatorConfig.getTopicPreservePartitions())).andStubReturn(Boolean.valueOf(z));
        EasyMock.expect(Integer.valueOf(consumerOffsetsTranslatorConfig.getTopicCreateBackoffMs())).andStubReturn(Integer.valueOf(i));
        EasyMock.expect(Boolean.valueOf(consumerOffsetsTranslatorConfig.getTopicAutoCreate())).andStubReturn(Boolean.valueOf(z2));
        EasyMock.expect(Boolean.valueOf(consumerOffsetsTranslatorConfig.getTopicConfigSync())).andStubReturn(Boolean.valueOf(z3));
        EasyMock.expect(Integer.valueOf(consumerOffsetsTranslatorConfig.getTopicConfigSyncIntervalMs())).andStubReturn(Integer.valueOf(i2));
        EasyMock.expect(consumerOffsetsTranslatorConfig.getTopicTimestampType()).andStubReturn(timestampType.toString());
        return consumerOffsetsTranslatorConfig;
    }
}
