/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator;

import io.confluent.connect.replicator.ReplicatorSourceTask;
import io.confluent.connect.replicator.ReplicatorSourceTaskConfig;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTopicCommitter;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsCommitter;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriter;
import io.confluent.connect.replicator.util.ByteArrayConverter;
import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.TopicMetadata;
import io.confluent.connect.replicator.util.Utils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import kafka.log.LogConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;

@RunWith(value=Theories.class)
public class ReplicatorSourceTaskTest
extends EasyMockSupport {
    private final String taskId = "replicator-1";
    private final Converter byteArrayConverter = new ByteArrayConverter();
    private MockTime time;
    private final int topicCreateBackoffMs = 10000;
    private final int topicConfigSyncIntervalMs = 120000;
    private static final TimestampType DEFAULT_TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
    @Mock
    private SourceTaskContext context;
    @Mock
    private Consumer consumer;
    @Mock
    private OffsetStorageReader offsetReader;
    @Mock
    private ConsumerOffsetsTranslator offsetsReplicator;
    @Mock
    private ConsumerTimestampsWriter writer;
    private final String sourceTopic = "foo";
    private final String topicRenameFormat = "dc.${topic}";
    private final String destTopic = "dc.foo";

    @DataPoint
    public static TimestampType[] typeParams() {
        return new TimestampType[]{TimestampType.NO_TIMESTAMP_TYPE, TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME};
    }

    @DataPoint
    public static TimestampType[] userDefinedTypeParams() {
        return new TimestampType[]{TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME, null};
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.context = (SourceTaskContext)this.createMock(SourceTaskContext.class);
        this.consumer = (Consumer)this.createMock(Consumer.class);
        this.offsetReader = (OffsetStorageReader)this.createMock(OffsetStorageReader.class);
        this.offsetsReplicator = (ConsumerOffsetsTranslator)this.createMock(ConsumerOffsetsTranslator.class);
    }

    @Test
    public void testVersion() {
        ReplicatorSourceTaskConfig config = (ReplicatorSourceTaskConfig)this.createMock(ReplicatorSourceTaskConfig.class);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        EasyMock.expect((Object)config.getTopicAutoCreate()).andStubReturn((Object)false);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        String version = task.version();
        Assert.assertNotNull((Object)version);
        Assert.assertFalse((boolean)version.isEmpty());
        this.verifyAll();
    }

    @Test
    public void testTaskStart() throws Exception {
        HashMap configOriginals = new HashMap();
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        EasyMock.expect((Object)sourceClient.clusterId()).andReturn((Object)"srcClusterId");
        EasyMock.expect((Object)destClient.clusterId()).andReturn((Object)"destClusterId");
        ReplicatorSourceTaskConfig config = (ReplicatorSourceTaskConfig)this.createMock(ReplicatorSourceTaskConfig.class);
        PartitionAssignor.Assignment assignment = (PartitionAssignor.Assignment)this.createMock(PartitionAssignor.Assignment.class);
        EasyMock.expect((Object)config.getTaskId()).andReturn((Object)"replicator-0");
        EasyMock.expect((Object)config.getSourceKeyConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getSourceValueConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getSourceHeaderConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.originalsStrings()).andReturn(null);
        EasyMock.expect((Object)config.getString(EasyMock.anyString())).andReturn(null).times(3);
        EasyMock.expect((Object)config.isOffsetTimestampsCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.isOffsetTopicCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.getPartitionAssignment()).andReturn((Object)assignment);
        EasyMock.expect((Object)assignment.partitions()).andReturn(Collections.emptyList());
        this.consumer.assign((Collection)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        EasyMock.expect((Object)config.isProvenanceHeaderEnabled()).andReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.start(configOriginals);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testTaskStartSameClusterId() throws Exception {
        HashMap configOriginals = new HashMap();
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        EasyMock.expect((Object)sourceClient.clusterId()).andReturn((Object)"srcClusterId");
        EasyMock.expect((Object)destClient.clusterId()).andReturn((Object)"srcClusterId");
        ReplicatorSourceTaskConfig config = (ReplicatorSourceTaskConfig)this.createMock(ReplicatorSourceTaskConfig.class);
        PartitionAssignor.Assignment assignment = (PartitionAssignor.Assignment)this.createMock(PartitionAssignor.Assignment.class);
        EasyMock.expect((Object)config.getTaskId()).andReturn((Object)"replicator-0");
        EasyMock.expect((Object)config.getSourceKeyConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getSourceValueConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getSourceHeaderConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.originalsStrings()).andReturn(null);
        EasyMock.expect((Object)config.getString(EasyMock.anyString())).andReturn(null).times(3);
        EasyMock.expect((Object)config.isOffsetTimestampsCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.isOffsetTopicCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.getPartitionAssignment()).andReturn((Object)assignment);
        EasyMock.expect((Object)assignment.partitions()).andReturn(Collections.emptyList());
        this.consumer.assign((Collection)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        EasyMock.expect((Object)config.isProvenanceHeaderEnabled()).andReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.start(configOriginals);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testConsumerRecordWithNoTimestamp() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)new byte[0], (Object)new byte[0]))));
        List<SourceRecord> sourceRecords = this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, null);
        Assert.assertEquals((long)1L, (long)sourceRecords.size());
        SourceRecord sourceRecord = sourceRecords.get(0);
        Assert.assertNull((Object)sourceRecord.timestamp());
        this.verifyAll();
    }

    @Test
    public void testPartitionPreservationDisabled() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)new byte[0], (Object)new byte[0]))));
        List<SourceRecord> sourceRecords = this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, null, false);
        Assert.assertEquals((long)1L, (long)sourceRecords.size());
        SourceRecord sourceRecord = sourceRecords.get(0);
        Assert.assertNull((Object)sourceRecord.kafkaPartition());
        this.verifyAll();
    }

    @Test
    public void testHeaderReplication() throws Exception {
        TopicPartition sourcePartition1 = new TopicPartition("foo", 0);
        HashMap configOriginals = new HashMap();
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        EasyMock.expect((Object)sourceClient.clusterId()).andReturn((Object)"srcClusterId");
        EasyMock.expect((Object)destClient.clusterId()).andReturn((Object)"destClusterId");
        ReplicatorSourceTaskConfig config = (ReplicatorSourceTaskConfig)this.createMock(ReplicatorSourceTaskConfig.class);
        PartitionAssignor.Assignment assignment = (PartitionAssignor.Assignment)this.createMock(PartitionAssignor.Assignment.class);
        EasyMock.expect((Object)config.getTaskId()).andReturn((Object)"replicator-0");
        EasyMock.expect((Object)config.getSourceKeyConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andReturn((Object)true);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andReturn((Object)"dc.${topic}");
        EasyMock.expect((Object)config.getSourceValueConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.getSourceHeaderConverter()).andReturn((Object)new ByteArrayConverter());
        EasyMock.expect((Object)config.originalsStrings()).andReturn(null);
        EasyMock.expect((Object)config.getString(EasyMock.anyString())).andReturn(null).times(3);
        EasyMock.expect((Object)config.isOffsetTimestampsCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.isOffsetTopicCommitEnabled()).andReturn((Object)false);
        EasyMock.expect((Object)config.getPartitionAssignment()).andReturn((Object)assignment);
        EasyMock.expect((Object)assignment.partitions()).andReturn(Collections.emptyList());
        this.consumer.assign((Collection)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.anyObject(Set.class), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        String headerKey = "my_header";
        String headerVal = "some_value";
        RecordHeaders headers = new RecordHeaders();
        headers.add((org.apache.kafka.common.header.Header)new RecordHeader("my_header", "some_value".getBytes()));
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition1, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, -1L, TimestampType.CREATE_TIME, Long.valueOf(0L), 0, 0, (Object)new byte[0], (Object)new byte[0], (Headers)headers))));
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)records);
        EasyMock.expect((Object)config.isProvenanceHeaderEnabled()).andReturn((Object)true).times(4);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.start(configOriginals);
        List sourceRecords = task.poll();
        Assert.assertEquals((long)1L, (long)sourceRecords.size());
        Assert.assertEquals((long)2L, (long)((SourceRecord)sourceRecords.get(0)).headers().size());
        Header header = (Header)((SourceRecord)sourceRecords.get(0)).headers().iterator().next();
        Assert.assertEquals((Object)"my_header", (Object)header.key());
        Assert.assertArrayEquals((byte[])"some_value".getBytes(), (byte[])((byte[])header.value()));
        this.verifyAll();
    }

    @Theory
    public void testTimestampTypeOverride(TimestampType timestampType, TimestampType userDefinedType) throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        long timestamp = 500L;
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, timestamp, timestampType, 0L, 0, 0, (Object)new byte[0], (Object)new byte[0]))));
        this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, userDefinedType);
        this.verifyAll();
    }

    @Test
    public void testTopicRename() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        long timestamp = 500L;
        ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(sourcePartition, Collections.singletonList(new ConsumerRecord("foo", 0, 0L, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)new byte[0], (Object)new byte[0]))));
        List<SourceRecord> sourceRecords = this.expectSimpleReceive("foo", sourcePartition, "dc.${topic}", (ConsumerRecords<byte[], byte[]>)records, null);
        Assert.assertEquals((long)1L, (long)sourceRecords.size());
        SourceRecord sourceRecord = sourceRecords.get(0);
        Assert.assertEquals((Object)"dc.foo", (Object)sourceRecord.topic());
        Assert.assertEquals((long)0L, (long)sourceRecord.kafkaPartition().intValue());
        Assert.assertEquals((long)timestamp, (long)sourceRecord.timestamp());
        Map connectPartition = sourceRecord.sourcePartition();
        Assert.assertEquals((Object)"foo", connectPartition.get("topic"));
        Assert.assertEquals((Object)0, connectPartition.get("partition"));
        this.verifyAll();
    }

    @Test
    public void testNonExistingTopicCreation() throws Exception {
        TopicPartition sourcePartition1 = new TopicPartition("foo", 0);
        TopicPartition destPartition1 = new TopicPartition("dc.foo", 0);
        String anotherSourceTopic = "bar";
        String anotherDestTopic = "dc.bar";
        TopicPartition sourcePartition2 = new TopicPartition(anotherSourceTopic, 0);
        TopicPartition destPartition2 = new TopicPartition(anotherDestTopic, 0);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        List<TopicPartition> sourceAssignment = Arrays.asList(sourcePartition1, sourcePartition2);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(new HashSet<String>(Arrays.asList("dc.foo", anotherDestTopic))), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)this.consumer.partitionsFor(anotherSourceTopic)).andStubReturn(Collections.singletonList(new PartitionInfo(anotherSourceTopic, 0, leader, new Node[]{leader}, new Node[]{leader})));
        Properties topicConfig = new Properties();
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andStubReturn((Object)topicConfig);
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andReturn(null);
        EasyMock.expect((Object)destClient.partitionExists(destPartition1)).andReturn((Object)false);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition1));
        EasyMock.expectLastCall();
        this.consumer.pause(Collections.singleton(sourcePartition1));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)sourceClient.topicConfig(anotherSourceTopic)).andStubReturn((Object)topicConfig);
        EasyMock.expect((Object)destClient.topicMetadata(anotherDestTopic)).andReturn(null);
        EasyMock.expect((Object)destClient.partitionExists(destPartition2)).andReturn((Object)false);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition2));
        EasyMock.expectLastCall();
        this.consumer.pause(Collections.singleton(sourcePartition2));
        EasyMock.expectLastCall();
        this.expectTopicCreation("dc.foo", topicConfig, destClient);
        this.expectTopicCreation(anotherDestTopic, topicConfig, destClient);
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testMetadataUpdateDuringStartup() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        final Capture topicMetadataListenerCapture = new Capture();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.capture((Capture)topicMetadataListenerCapture));
        EasyMock.expectLastCall();
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andAnswer((IAnswer)new IAnswer<List<PartitionInfo>>(){

            public List<PartitionInfo> answer() throws Throwable {
                ((ReplicatorAdminClient.TopicMetadataListener)topicMetadataListenerCapture.getValue()).onTopicMetadataRefresh();
                return Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader}));
            }
        });
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((String)"foo", (int)0))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        EasyMock.expect((Object)destClient.aliveBrokers()).andStubReturn((Object)1);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        this.verifyAll();
    }

    @Test
    public void testUpdateTopicConfiguration() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        Properties sourceTopicConfig = new Properties();
        sourceTopicConfig.put("max.message.bytes", "2000000");
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((String)"foo", (int)0))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        EasyMock.expect((Object)destClient.aliveBrokers()).andStubReturn((Object)1);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andReturn((Object)true);
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andReturn((Object)sourceTopicConfig);
        EasyMock.expect((Object)destClient.topicConfig("dc.foo")).andReturn((Object)new Properties());
        destClient.changeTopicConfig("dc.foo", sourceTopicConfig);
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testConfigNotQueriedWhenAutoSyncDisabled() throws Exception {
        int partitionId = 0;
        TopicPartition sourcePartition = new TopicPartition("foo", partitionId);
        TopicPartition destPartition = new TopicPartition("dc.foo", partitionId);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", partitionId, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((String)"foo", (int)partitionId))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testSkipTopicCreationIfNotEnoughBrokers() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        Node replica = new Node(0, "localhost", 9093);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader, replica}, new Node[]{leader, replica})));
        Properties topicConfig = new Properties();
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andReturn((Object)topicConfig);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn(null);
        EasyMock.expect((Object)destClient.aliveBrokers()).andReturn((Object)1);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)false);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andReturn((Object)false);
        this.consumer.pause(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testTopicCreationRetry() throws Exception {
        int topicCreateBackoffMs = 5000;
        int topicConfigSyncIntervalMs = 10000;
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, topicCreateBackoffMs, true, topicConfigSyncIntervalMs, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.consumer.pause(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        Properties topicConfig = new Properties();
        EasyMock.expect((Object)destClient.createTopic("dc.foo", 1, 1, topicConfig)).andThrow((Throwable)new RuntimeException());
        EasyMock.expect((Object)config.getTopicCreateBackoffMs()).andStubReturn((Object)topicCreateBackoffMs);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andStubReturn((Object)false);
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andStubReturn((Object)topicConfig);
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn(null);
        EasyMock.expect((Object)destClient.aliveBrokers()).andStubReturn((Object)1);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        EasyMock.expect((Object)config.getTopicConfigSyncIntervalMs()).andReturn((Object)topicConfigSyncIntervalMs);
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
        this.resetAll();
        EasyMock.expect((Object)config.getTopicCreateBackoffMs()).andStubReturn((Object)topicCreateBackoffMs);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)"dc.${topic}");
        EasyMock.expect((Object)config.getTopicConfigSyncIntervalMs()).andStubReturn((Object)120000);
        EasyMock.expect((Object)config.getTopicAutoCreate()).andStubReturn((Object)true);
        EasyMock.expect((Object)config.getTopicConfigSync()).andStubReturn((Object)true);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andStubReturn((Object)false);
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.replayAll();
        this.time.sleep(topicCreateBackoffMs / 2);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
        this.resetAll();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        this.time.sleep(topicCreateBackoffMs);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        EasyMock.expect((Object)config.getTopicCreateBackoffMs()).andStubReturn((Object)0);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)"dc.${topic}");
        EasyMock.expect((Object)config.getTopicAutoCreate()).andStubReturn((Object)true);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andStubReturn((Object)topicConfig);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expect((Object)destClient.aliveBrokers()).andStubReturn((Object)1);
        EasyMock.expect((Object)destClient.createTopic("dc.foo", 1, 1, topicConfig)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andReturn(null);
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.singleton(sourcePartition));
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        this.consumer.resume(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testSeekCommittedOffset() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 1);
        TopicPartition destPartition = new TopicPartition("dc.foo", 1);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)true);
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((TopicPartition)sourcePartition))).andReturn((Object)Utils.toConnectOffset((long)50L));
        this.consumer.seek(sourcePartition, 51L);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testOffsetOutOfRangeHandling() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 1);
        TopicPartition destPartition = new TopicPartition("dc.foo", 1);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)true);
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((TopicPartition)sourcePartition))).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andThrow((Throwable)new OffsetOutOfRangeException(Collections.singletonMap(sourcePartition, 51L)));
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testExpandingTopicPartitions() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.paused()).andStubReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Arrays.asList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("foo", 1, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        destClient.addPartitions("dc.foo", 2);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andReturn((Object)true);
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((TopicPartition)sourcePartition))).andReturn(null);
        Properties sourceTopicConfig = new Properties();
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andReturn((Object)new Properties());
        this.overrideTimestampType(DEFAULT_TIMESTAMP_TYPE, sourceTopicConfig);
        destClient.changeTopicConfig("dc.foo", sourceTopicConfig);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)DEFAULT_TIMESTAMP_TYPE.toString());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicConfig("dc.foo")).andReturn((Object)new Properties());
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testSkipPartitionExpansionIfNotPartitionsPreserved() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", false, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.paused()).andStubReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Arrays.asList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("foo", 1, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andStubReturn((Object)true);
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((TopicPartition)sourcePartition))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testNonExistingTopicPaused() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 1);
        TopicPartition destPartition = new TopicPartition("dc.foo", 1);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        Capture capturedListener = new Capture();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.capture((Capture)capturedListener));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        this.consumer.pause(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
        this.resetAll();
        this.time.sleep(5000L);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)"dc.${topic}");
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.singleton(sourcePartition));
        this.consumer.resume(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testNonExistentPartitionsPausedWhenAutoCreateDisabled() throws Exception {
        int partitionId = 0;
        TopicPartition sourcePartition = new TopicPartition("foo", partitionId);
        TopicPartition destPartition = new TopicPartition("dc.foo", partitionId);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, false, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn(null);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        Capture capturedListener = new Capture();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.capture((Capture)capturedListener));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andStubReturn((Object)false);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andStubReturn((Object)false);
        this.consumer.pause(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
        this.resetAll();
        this.time.sleep(5000L);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)"dc.${topic}");
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.singleton(sourcePartition));
        this.consumer.resume(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.paused()).andReturn(Collections.emptySet());
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)true);
        this.replayAll();
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testCommitRecord() throws Exception {
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter committer = new ConsumerTimestampsCommitter("mygroup", this.writer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition((String)topicPartition.topic(), (int)topicPartition.partition()), null, null, null, null, null, null, null, Long.valueOf(100L), null);
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter, committer);
        committer.commitRecord(sourceRecord);
        EasyMock.expectLastCall();
        this.replayAll();
        task.commitRecord(sourceRecord);
        this.verifyAll();
    }

    @Test
    public void testCommitSourceOffset() throws Exception {
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerOffsetsTopicCommitter committer = new ConsumerOffsetsTopicCommitter(this.consumer);
        long offset = 1000L;
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition((String)topicPartition.topic(), (int)topicPartition.partition()), Utils.toConnectOffset((long)offset), null, null, null, null, null, null, Long.valueOf(100L), null);
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter, null, committer);
        committer.commitRecord(sourceRecord);
        EasyMock.expectLastCall();
        committer.commit();
        EasyMock.expectLastCall();
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1L, ""));
        this.consumer.commitSync(offsetsToCommit);
        EasyMock.expectLastCall();
        this.replayAll();
        task.commitRecord(sourceRecord);
        task.commit();
        committer.checkCommit();
        this.verifyAll();
    }

    @Test
    public void testCommitRecordWhenRecordHasNoTimestamp() throws Exception {
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter committer = new ConsumerTimestampsCommitter("mygroup", this.writer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition((String)topicPartition.topic(), (int)topicPartition.partition()), null, null, null, null, null, null, null, null, null);
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter, committer);
        committer.commitRecord(sourceRecord);
        EasyMock.expectLastCall();
        this.replayAll();
        task.commitRecord(sourceRecord);
        this.verifyAll();
    }

    @Test
    public void testCommit() throws Exception {
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, false, 120000, DEFAULT_TIMESTAMP_TYPE);
        TopicPartition topicPartition = new TopicPartition("mytopic", 1);
        ConsumerTimestampsCommitter committer = new ConsumerTimestampsCommitter("mygroup", this.writer);
        SourceRecord sourceRecord = new SourceRecord(Utils.toConnectPartition((String)topicPartition.topic(), (int)topicPartition.partition()), null, null, null, null, null, null, null, Long.valueOf(100L), null);
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter, committer);
        committer.commit();
        EasyMock.expectLastCall();
        this.replayAll();
        task.commit();
        this.verifyAll();
    }

    private List<SourceRecord> expectSimpleReceive(String sourceTopic, TopicPartition sourcePartition, String topicRenameFormat, ConsumerRecords<byte[], byte[]> records, TimestampType userDefinedType) throws InterruptedException, ExecutionException {
        return this.expectSimpleReceive(sourceTopic, sourcePartition, topicRenameFormat, records, userDefinedType, true);
    }

    private List<SourceRecord> expectSimpleReceive(String sourceTopic, TopicPartition sourcePartition, String topicRenameFormat, ConsumerRecords<byte[], byte[]> records, TimestampType userDefinedType, boolean preservePartitions) throws InterruptedException, ExecutionException {
        String destTopic = Utils.renameTopic((String)topicRenameFormat, (String)sourceTopic);
        TopicPartition destPartition = new TopicPartition(destTopic, 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock(topicRenameFormat, preservePartitions, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton(destTopic)), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor(sourceTopic)).andReturn(Collections.singletonList(new PartitionInfo(sourceTopic, 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)destClient.topicMetadata(destTopic)).andReturn((Object)new TopicMetadata(destTopic, 1));
        if (preservePartitions) {
            EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        }
        EasyMock.expect((Object)destClient.topicExists(destTopic)).andReturn((Object)true);
        EasyMock.expect((Object)sourceClient.topicConfig(sourceTopic)).andReturn((Object)new Properties());
        Properties sourceTopicConfig = new Properties();
        TimestampType timestampType = userDefinedType != null ? userDefinedType : DEFAULT_TIMESTAMP_TYPE;
        this.overrideTimestampType(timestampType, sourceTopicConfig);
        destClient.changeTopicConfig(destTopic, sourceTopicConfig);
        EasyMock.expect((Object)config.getTopicTimestampType()).andReturn((Object)timestampType.toString());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicConfig(destTopic)).andReturn((Object)new Properties());
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((String)sourceTopic, (int)0))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn(records);
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)config.isProvenanceHeaderEnabled()).andReturn((Object)true).times(3);
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        return task.poll();
    }

    private void overrideTimestampType(TimestampType type, Properties config) {
        config.setProperty(LogConfig.MessageTimestampTypeProp(), type.toString());
    }

    private ReplicatorSourceTaskConfig setupTaskConfigMock(String topicRenameFormat, boolean preservePartitions, boolean autoCreateTopics, int createBackoffMs, boolean autoSyncTopics, int topicConfigSyncIntervalMs, TimestampType defaultTimestampType) {
        ReplicatorSourceTaskConfig config = (ReplicatorSourceTaskConfig)this.createMock(ReplicatorSourceTaskConfig.class);
        EasyMock.expect((Object)config.getTopicRenameFormat()).andStubReturn((Object)topicRenameFormat);
        EasyMock.expect((Object)config.getTopicPreservePartitions()).andStubReturn((Object)preservePartitions);
        EasyMock.expect((Object)config.getTopicCreateBackoffMs()).andStubReturn((Object)createBackoffMs);
        EasyMock.expect((Object)config.getTopicAutoCreate()).andStubReturn((Object)autoCreateTopics);
        EasyMock.expect((Object)config.getTopicConfigSync()).andStubReturn((Object)autoSyncTopics);
        EasyMock.expect((Object)config.getTopicConfigSyncIntervalMs()).andStubReturn((Object)topicConfigSyncIntervalMs);
        EasyMock.expect((Object)config.getTopicTimestampType()).andStubReturn((Object)defaultTimestampType.toString());
        return config;
    }

    private void expectTopicCreation(String destTopic, Properties topicConfig, ReplicatorAdminClient destClient) throws InterruptedException, ExecutionException {
        EasyMock.expect((Object)destClient.topicMetadata(destTopic)).andReturn(null);
        EasyMock.expect((Object)destClient.aliveBrokers()).andReturn((Object)1);
        EasyMock.expect((Object)destClient.createTopic(destTopic, 1, 1, topicConfig)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicExists(destTopic)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicConfig(destTopic)).andReturn((Object)topicConfig);
    }

    @Test
    public void testUpdateTopicConfigurationTimeouts() throws Exception {
        TopicPartition sourcePartition = new TopicPartition("foo", 0);
        TopicPartition destPartition = new TopicPartition("dc.foo", 0);
        List<TopicPartition> sourceAssignment = Collections.singletonList(sourcePartition);
        EasyMock.expect((Object)this.context.offsetStorageReader()).andStubReturn((Object)this.offsetReader);
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock("dc.${topic}", true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.assign(sourceAssignment);
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.consumer.poll(EasyMock.anyLong())).andReturn((Object)ConsumerRecords.empty());
        this.offsetsReplicator.translateOffsets((List)EasyMock.anyObject());
        EasyMock.expectLastCall();
        destClient.setInterestedTopics((Set)EasyMock.eq(Collections.singleton("dc.foo")), (ReplicatorAdminClient.TopicMetadataListener)EasyMock.anyObject(ReplicatorAdminClient.TopicMetadataListener.class));
        EasyMock.expectLastCall();
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)this.consumer.partitionsFor("foo")).andStubReturn(Collections.singletonList(new PartitionInfo("foo", 0, leader, new Node[]{leader}, new Node[]{leader})));
        Properties sourceTopicConfig = new Properties();
        sourceTopicConfig.put("max.message.bytes", "2000000");
        EasyMock.expect((Object)this.offsetReader.offset(Utils.toConnectPartition((String)"foo", (int)0))).andReturn(null);
        EasyMock.expect((Object)this.consumer.committed(sourcePartition)).andReturn(null);
        this.consumer.seekToBeginning(Collections.singleton(sourcePartition));
        EasyMock.expectLastCall();
        EasyMock.expect((Object)destClient.topicMetadata("dc.foo")).andStubReturn((Object)new TopicMetadata("dc.foo", 1));
        EasyMock.expect((Object)destClient.aliveBrokers()).andStubReturn((Object)1);
        EasyMock.expect((Object)destClient.partitionExists(destPartition)).andReturn((Object)true);
        EasyMock.expect((Object)destClient.topicExists("dc.foo")).andReturn((Object)true);
        EasyMock.expect((Object)sourceClient.topicConfig("foo")).andReturn((Object)sourceTopicConfig);
        EasyMock.expect((Object)destClient.topicConfig("dc.foo")).andReturn((Object)new Properties());
        destClient.changeTopicConfig("dc.foo", sourceTopicConfig);
        EasyMock.expectLastCall().andThrow((Throwable)new TimeoutException());
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.doStart(sourceAssignment);
        Assert.assertTrue((boolean)task.poll().isEmpty());
        this.verifyAll();
    }

    @Test
    public void testStop() throws Exception {
        String topicRenameFormat = "dc.${topic}";
        ReplicatorAdminClient sourceClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorAdminClient destClient = (ReplicatorAdminClient)this.createMock(ReplicatorAdminClient.class);
        ReplicatorSourceTaskConfig config = this.setupTaskConfigMock(topicRenameFormat, true, true, 10000, true, 120000, DEFAULT_TIMESTAMP_TYPE);
        this.consumer.wakeup();
        EasyMock.expectLastCall();
        this.consumer.close();
        EasyMock.expectLastCall();
        sourceClient.close();
        EasyMock.expectLastCall();
        destClient.close();
        EasyMock.expectLastCall();
        this.replayAll();
        ReplicatorSourceTask task = new ReplicatorSourceTask(config, this.context, "replicator-1", (Time)this.time, this.consumer, this.offsetsReplicator, sourceClient, destClient, this.byteArrayConverter, this.byteArrayConverter);
        task.stop();
        this.verifyAll();
    }
}

