/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorConfig;
import io.confluent.connect.replicator.offsets.GroupTopicPartition;
import io.confluent.connect.replicator.offsets.GroupTopicPartitionSerializer;
import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.runner.RunWith;

@RunWith(value=Theories.class)
public class ConsumerOffsetsTranslatorTest
extends EasyMockSupport {
    private final String taskId = "replicator-1";
    private final Set<String> whitelistTopics = Collections.singleton("foo");
    private final Pattern topicPattern = Pattern.compile("other");
    private final Set<String> blacklistTopics = Collections.singleton("bad");
    private MockTime time;
    private final int topicCreateBackoffMs = 10000;
    private final int topicConfigSyncIntervalMs = 120000;
    private static final TimestampType DEFAULT_TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
    @Mock
    private Consumer<byte[], byte[]> destConsumer;
    private final String sourceGroup1 = "mygroup1";
    private final String sourceGroup2 = "mygroup2";
    private final String sourceTopic = "foo";
    private final String ignoreTopic = "ignore";
    private final String badTopic = "bad";
    private final String topicRenameFormat = "dc.${topic}";

    @DataPoint
    public static TimestampType[] typeParams() {
        return new TimestampType[]{TimestampType.NO_TIMESTAMP_TYPE, TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME};
    }

    @DataPoint
    public static TimestampType[] userDefinedTypeParams() {
        return new TimestampType[]{TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME, null};
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.destConsumer = (Consumer)this.createMock(Consumer.class);
    }

    @Test
    public void testConsumerRecord() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 0, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordsDifferentTimestamps() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroup1TopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        GroupTopicPartition sourceGroup2TopicPartition = new GroupTopicPartition("mygroup2", sourcePartition);
        byte[] key1Bytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroup1TopicPartition);
        byte[] key2Bytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroup2TopicPartition);
        byte[] value1Bytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        byte[] value2Bytes = new LongSerializer().serialize("foo", Long.valueOf(1L));
        HashMap recordMap = new HashMap();
        ArrayList<ConsumerRecord> recordList = new ArrayList<ConsumerRecord>();
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)key1Bytes, (Object)value1Bytes));
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)key2Bytes, (Object)value2Bytes));
        recordMap.put(sourcePartition, recordList);
        ConsumerRecords records = new ConsumerRecords(recordMap);
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 0, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordsUsingCache() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroup1TopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        GroupTopicPartition sourceGroup2TopicPartition = new GroupTopicPartition("mygroup2", sourcePartition);
        byte[] key1Bytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroup1TopicPartition);
        byte[] key2Bytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroup2TopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        HashMap recordMap = new HashMap();
        ArrayList<ConsumerRecord> recordList = new ArrayList<ConsumerRecord>();
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)key1Bytes, (Object)valueBytes));
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)key2Bytes, (Object)valueBytes));
        recordMap.put(sourcePartition, recordList);
        ConsumerRecords records = new ConsumerRecords(recordMap);
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 0, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissing() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 1, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingTwice() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 2, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingWithBackoff() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 3, 60000L, 150L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordOffsetMissingWithExpiry() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 3, 300L, 0L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordWithBadTopic() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition sourcePartition2 = new TopicPartition("bad", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        GroupTopicPartition sourceGroupTopicPartition2 = new GroupTopicPartition("bad", sourcePartition2);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] key2Bytes = new GroupTopicPartitionSerializer().serialize("bad", sourceGroupTopicPartition2);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        byte[] value2Bytes = new LongSerializer().serialize("bad", Long.valueOf(0L));
        HashMap recordMap = new HashMap();
        ArrayList<ConsumerRecord> recordList = new ArrayList<ConsumerRecord>();
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes));
        recordList.add(new ConsumerRecord("bad", 0, 0L, (Object)key2Bytes, (Object)value2Bytes));
        recordMap.put(sourcePartition, recordList);
        ConsumerRecords records = new ConsumerRecords(recordMap);
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 0, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordWithIgnoredTopic() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition sourcePartition2 = new TopicPartition("ignore", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        GroupTopicPartition sourceGroupTopicPartition2 = new GroupTopicPartition("ignore", sourcePartition2);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] key2Bytes = new GroupTopicPartitionSerializer().serialize("ignore", sourceGroupTopicPartition2);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        byte[] value2Bytes = new LongSerializer().serialize("ignore", Long.valueOf(0L));
        HashMap recordMap = new HashMap();
        ArrayList<ConsumerRecord> recordList = new ArrayList<ConsumerRecord>();
        recordList.add(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes));
        recordList.add(new ConsumerRecord("ignore", 0, 0L, (Object)key2Bytes, (Object)value2Bytes));
        recordMap.put(sourcePartition, recordList);
        ConsumerRecords records = new ConsumerRecords(recordMap);
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, 0, 60000L, 100L);
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordWithFetchOffetsException() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        long fetchOffsetExpiryMs = 60000L;
        long fetchOffsetRetryBackoffMs = 100L;
        String destTopic = Utils.renameTopic((String)"dc.${topic}", (String)"foo");
        ConsumerOffsetsTranslatorConfig config = this.setupTaskConfigMock(fetchOffsetExpiryMs, fetchOffsetRetryBackoffMs, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        timestampsToSearch.put(new TopicPartition(destTopic, 0), 0L);
        Map<String, List<PartitionInfo>> topics = Collections.singletonMap(destTopic, Collections.singletonList(new PartitionInfo(destTopic, 0, null, null, null)));
        EasyMock.expect((Object)this.destConsumer.listTopics()).andReturn(topics);
        HashMap<TopicPartition, OffsetAndTimestamp> offsetTimesResult = new HashMap<TopicPartition, OffsetAndTimestamp>();
        offsetTimesResult.put(new TopicPartition(destTopic, 0), new OffsetAndTimestamp(0L, 0L));
        EasyMock.expect((Object)this.destConsumer.offsetsForTimes(timestampsToSearch)).andThrow((Throwable)new KafkaException());
        this.destConsumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        ConsumerOffsetsTranslator task = new ConsumerOffsetsTranslator(config, "replicator-1", this.time){

            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig config, String groupId) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        };
        task.translateOffsets(records.records(sourcePartition));
        this.verifyAll();
        Assert.assertEquals((long)1L, (long)task.timestampsForRetry().size());
    }

    @Test
    public void testConsumerRecordWithTopicPartitionMissing() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        long fetchOffsetExpiryMs = 60000L;
        long fetchOffsetRetryBackoffMs = 100L;
        String destTopic = Utils.renameTopic((String)"dc.${topic}", (String)"foo");
        ConsumerOffsetsTranslatorConfig config = this.setupTaskConfigMock(fetchOffsetExpiryMs, fetchOffsetRetryBackoffMs, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        timestampsToSearch.put(new TopicPartition(destTopic, 0), 0L);
        EasyMock.expect((Object)this.destConsumer.listTopics()).andReturn(Collections.emptyMap());
        this.destConsumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        ConsumerOffsetsTranslator task = new ConsumerOffsetsTranslator(config, "replicator-1", this.time){

            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig config, String groupId) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        };
        task.translateOffsets(records.records(sourcePartition));
        this.verifyAll();
        Assert.assertEquals((long)1L, (long)task.timestampsForRetry().size());
    }

    @Test
    public void testConsumerRecordWithCommitOffsetsException() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        GroupTopicPartition sourceGroupTopicPartition = new GroupTopicPartition("mygroup1", sourcePartition);
        byte[] keyBytes = new GroupTopicPartitionSerializer().serialize("foo", sourceGroupTopicPartition);
        byte[] valueBytes = new LongSerializer().serialize("foo", Long.valueOf(0L));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, (Object)keyBytes, (Object)valueBytes))));
        long fetchOffsetExpiryMs = 60000L;
        long fetchOffsetRetryBackoffMs = 100L;
        String destTopic = Utils.renameTopic((String)"dc.${topic}", (String)"foo");
        ConsumerOffsetsTranslatorConfig config = this.setupTaskConfigMock(fetchOffsetExpiryMs, fetchOffsetRetryBackoffMs, this.whitelistTopics, this.topicPattern, this.blacklistTopics, "dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        timestampsToSearch.put(new TopicPartition(destTopic, 0), 0L);
        Map<String, List<PartitionInfo>> topics = Collections.singletonMap(destTopic, Collections.singletonList(new PartitionInfo(destTopic, 0, null, null, null)));
        EasyMock.expect((Object)this.destConsumer.listTopics()).andReturn(topics);
        HashMap<TopicPartition, OffsetAndTimestamp> offsetTimesResult = new HashMap<TopicPartition, OffsetAndTimestamp>();
        offsetTimesResult.put(new TopicPartition(destTopic, 0), new OffsetAndTimestamp(0L, 0L));
        EasyMock.expect((Object)this.destConsumer.offsetsForTimes(timestampsToSearch)).andReturn(offsetTimesResult);
        for (int i = 0; i < records.count(); ++i) {
            HashMap<TopicPartition, OffsetAndMetadata> offsetsForCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
            offsetsForCommit.put(new TopicPartition(destTopic, 0), new OffsetAndMetadata(1L));
            this.destConsumer.commitSync(offsetsForCommit);
            EasyMock.expectLastCall().andThrow((Throwable)new KafkaException());
        }
        this.destConsumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        ConsumerOffsetsTranslator task = new ConsumerOffsetsTranslator(config, "replicator-1", this.time){

            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig config, String groupId) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        };
        task.translateOffsets(records.records(sourcePartition));
        this.verifyAll();
        Assert.assertEquals((long)0L, (long)task.timestampsForRetry().size());
    }

    private void expectSimpleReceive(String sourceTopic, TopicPartition sourcePartition, String topicRenameFormat, ConsumerRecords<byte[], byte[]> records, int retryCallCount, long fetchOffsetExpiryMs, long fetchOffsetRetryBackoffMs) throws InterruptedException, ExecutionException {
        long period = 100L;
        String destTopic = Utils.renameTopic((String)topicRenameFormat, (String)sourceTopic);
        ConsumerOffsetsTranslatorConfig config = this.setupTaskConfigMock(fetchOffsetExpiryMs, fetchOffsetRetryBackoffMs, this.whitelistTopics, this.topicPattern, this.blacklistTopics, topicRenameFormat, true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        long elapsedTime = 0L;
        long lastAttempt = 0L;
        int offsetsForTimesCallCount = 0;
        do {
            if (elapsedTime >= fetchOffsetExpiryMs) {
                this.destConsumer.close();
                EasyMock.expectLastCall();
                break;
            }
            if (elapsedTime > 0L && lastAttempt + fetchOffsetRetryBackoffMs > elapsedTime) {
                elapsedTime += period;
                this.destConsumer.close();
                EasyMock.expectLastCall();
                continue;
            }
            HashMap<TopicPartition, Set> searchedTimestamps = new HashMap<TopicPartition, Set>();
            for (ConsumerRecord record : records) {
                if (!record.topic().equals(sourceTopic)) continue;
                TopicPartition destTp = new TopicPartition(destTopic, 0);
                HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
                Long timestamp = new LongDeserializer().deserialize(record.topic(), (byte[])record.value());
                long destOffset = 1000L + timestamp;
                Set ts = searchedTimestamps.computeIfAbsent(destTp, k -> new HashSet());
                if (!ts.contains(timestamp)) {
                    HashMap<TopicPartition, OffsetAndTimestamp> offsetTimesResult = new HashMap<TopicPartition, OffsetAndTimestamp>();
                    if (offsetsForTimesCallCount == retryCallCount) {
                        offsetTimesResult.put(destTp, new OffsetAndTimestamp(destOffset, timestamp.longValue()));
                    } else {
                        offsetTimesResult.put(destTp, null);
                    }
                    timestampsToSearch.put(destTp, timestamp);
                    Map<String, List<PartitionInfo>> topics = Collections.singletonMap(destTopic, Collections.singletonList(new PartitionInfo(destTp.topic(), destTp.partition(), null, null, null)));
                    EasyMock.expect((Object)this.destConsumer.listTopics()).andReturn(topics);
                    EasyMock.expect((Object)this.destConsumer.offsetsForTimes(timestampsToSearch)).andReturn(offsetTimesResult);
                    ts.add(timestamp);
                }
                if (offsetsForTimesCallCount == retryCallCount) {
                    HashMap<TopicPartition, OffsetAndMetadata> offsetsForCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
                    offsetsForCommit.put(destTp, new OffsetAndMetadata(destOffset + 1L));
                    this.destConsumer.commitSync(offsetsForCommit);
                    EasyMock.expectLastCall();
                }
                this.destConsumer.close();
                EasyMock.expectLastCall();
            }
            lastAttempt = elapsedTime;
            elapsedTime += period;
        } while (offsetsForTimesCallCount++ < retryCallCount);
        this.replayAll();
        ConsumerOffsetsTranslator task = new ConsumerOffsetsTranslator(config, "replicator-1", this.time){

            protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig config, String groupId) {
                return ConsumerOffsetsTranslatorTest.this.destConsumer;
            }
        };
        int translateOffsetCallCount = 0;
        do {
            if (translateOffsetCallCount == 0) {
                task.translateOffsets(records.records(sourcePartition));
            } else {
                task.translateOffsets(Collections.emptyList());
            }
            this.time.sleep(period);
        } while (translateOffsetCallCount++ < retryCallCount);
    }

    private ConsumerOffsetsTranslatorConfig setupTaskConfigMock(long fetchOffsetExpiryMs, long fetchOffsetRetryBackoffMs, Set<String> whitelistTopics, Pattern topicPattern, Set<String> blacklistTopics, String topicRenameFormat, boolean preservePartitions, boolean autoCreateTopics, int createBackoffMs, boolean autoSyncTopics, int topicConfigSyncIntervalMs, TimestampType defaultTimestampType) {
        ConsumerOffsetsTranslatorConfig config = (ConsumerOffsetsTranslatorConfig)this.createMock(ConsumerOffsetsTranslatorConfig.class);
        EasyMock.expect((Object)config.getFetchOffsetExpiryMs()).andStubReturn((Object)fetchOffsetExpiryMs);
        EasyMock.expect((Object)config.getFetchOffsetRetryBackoffMs()).andStubReturn((Object)fetchOffsetRetryBackoffMs);
        EasyMock.expect((Object)config.getTopics()).andStubReturn(whitelistTopics);
        EasyMock.expect((Object)config.getTopicPattern()).andStubReturn((Object)topicPattern);
        EasyMock.expect((Object)config.getBlacklistTopics()).andStubReturn(blacklistTopics);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)topicRenameFormat);
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)preservePartitions);
        EasyMock.expect((Object)config.getTopicCreateBackoffMs()).andStubReturn((Object)createBackoffMs);
        EasyMock.expect((Object)config.getTopicAutoCreate()).andStubReturn((Object)autoCreateTopics);
        EasyMock.expect((Object)config.getTopicConfigSync()).andStubReturn((Object)autoSyncTopics);
        EasyMock.expect((Object)config.getTopicConfigSyncIntervalMs()).andStubReturn((Object)topicConfigSyncIntervalMs);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)defaultTimestampType.toString());
        return config;
    }
}

