package io.confluent.connect.replicator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/replicator/TopicMonitorThreadTest.class */
public class TopicMonitorThreadTest extends EasyMockSupport {
    private AdminClient adminClient;
    private KafkaFuture<Collection<TopicListing>> listingsFuture;
    private KafkaFuture<TopicDescription> descriptionFuture;
    private KafkaFuture<TopicDescription> otherDescriptionFuture;
    private ListTopicsResult listTopicsResult;
    private DescribeTopicsResult describeTopicsResult;
    private ConnectorContext context;
    private long pollIntervalMs;
    private List<Node> nodes;

    @Before
    public void setUp() throws Exception {
        this.adminClient = (AdminClient) createMock(AdminClient.class);
        this.listTopicsResult = (ListTopicsResult) createMock(ListTopicsResult.class);
        this.describeTopicsResult = (DescribeTopicsResult) createMock(DescribeTopicsResult.class);
        this.listingsFuture = (KafkaFuture) createMock(KafkaFuture.class);
        this.descriptionFuture = (KafkaFuture) createMock(KafkaFuture.class);
        this.otherDescriptionFuture = (KafkaFuture) createMock(KafkaFuture.class);
        this.context = (ConnectorContext) createMock(ConnectorContext.class);
        this.pollIntervalMs = 10000L;
        this.nodes = Arrays.asList(new Node(0, "localhost", 9092));
    }

    @Test
    public void testBasicAssignment() throws Exception {
        final String str = "topic";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("topic", false)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.1
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testEmptyAssignment() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.emptyList());
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Assert.assertEquals(0L, newTopicMonitorThread.assignments(2).size());
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testNoEmptyTaskAssignments() throws Exception {
        final String str = "topic";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("topic", false)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.2
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Assert.assertEquals(2L, newTopicMonitorThread.assignments(5).size());
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testRegexAssignment() throws Exception {
        final String str = "foo";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.emptySet(), Pattern.compile("f.*"), Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(new ArrayList(Arrays.asList(new TopicListing("foo", false), new TopicListing("boo", false))));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.3
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("foo", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testWhitelistInternalTopic() throws Exception {
        final String str = "__consumer_offsets";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("__consumer_offsets"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("__consumer_offsets", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.4
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("__consumer_offsets", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("__consumer_offsets", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("__consumer_offsets", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testExcludeInternalTopics() throws Exception {
        final String str = "foo";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(new ArrayList(Arrays.asList(new TopicListing("foo", false), new TopicListing("__consumer_offsets", true))));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.5
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("foo", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testBlacklistTopicsFiltered() throws Exception {
        final String str = "topic";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.singleton("blacklisted"), this.pollIntervalMs, 0, false, (List) null, this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(new ArrayList(Arrays.asList(new TopicListing("topic", false), new TopicListing("blacklisted", false))));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.6
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testMetadataChangeTriggersReconfiguration() throws Exception {
        final String str = "topic";
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), 100L, 0, false, (List) null, this.adminClient);
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult).anyTimes();
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture).anyTimes();
        EasyMock.expect(this.listingsFuture.get()).andReturn(new ArrayList(Arrays.asList(new TopicListing("topic", false)))).andStubReturn(new ArrayList(Arrays.asList(new TopicListing("topic", false), new TopicListing("other", false))));
        HashMap hashMap = new HashMap();
        hashMap.put("topic", this.descriptionFuture);
        hashMap.put("other", this.otherDescriptionFuture);
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult).anyTimes();
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.7
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        }).andStubReturn(hashMap);
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2))).anyTimes();
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("other", false, createPartitionList(2))).anyTimes();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
        hashMap2.put("other", Arrays.asList(new PartitionInfo("other", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("other", 1, node, new Node[]{node}, new Node[]{node})));
        this.context.requestTaskReconfiguration();
        EasyMock.expectLastCall();
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        Thread.sleep(200L);
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testMonitorThreadsWaitIsInterrupted() throws Exception {
        final String str = "topic";
        Set singleton = Collections.singleton("topic");
        CountDownLatch countDownLatch = (CountDownLatch) createMock(CountDownLatch.class);
        this.pollIntervalMs = 1000L;
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", singleton, (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient, new RoundRobinAssignor(), new CountDownLatch(1), countDownLatch);
        EasyMock.expect(Long.valueOf(countDownLatch.getCount())).andReturn(1L).anyTimes();
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult).anyTimes();
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture).anyTimes();
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("topic", false))).anyTimes();
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult).anyTimes();
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.8
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        }).anyTimes();
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2))).anyTimes();
        EasyMock.expect(Boolean.valueOf(countDownLatch.await(EasyMock.eq(this.pollIntervalMs), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andThrow(new InterruptedException());
        EasyMock.expect(Boolean.valueOf(countDownLatch.await(EasyMock.eq(this.pollIntervalMs), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andReturn(true).anyTimes();
        expectShutdown();
        countDownLatch.countDown();
        EasyMock.expectLastCall();
        replayAll();
        newTopicMonitorThread.start();
        Thread.sleep(this.pollIntervalMs);
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testMonitorThreadsWaitIsCancelled() throws Exception {
        final String str = "topic";
        Set singleton = Collections.singleton("topic");
        CountDownLatch countDownLatch = (CountDownLatch) createMock(CountDownLatch.class);
        this.pollIntervalMs = 1000L;
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", singleton, (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient, new RoundRobinAssignor(), new CountDownLatch(1), countDownLatch);
        EasyMock.expect(Long.valueOf(countDownLatch.getCount())).andReturn(1L).anyTimes();
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult).anyTimes();
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture).anyTimes();
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("topic", false))).anyTimes();
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult).anyTimes();
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.9
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        }).anyTimes();
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2))).anyTimes();
        EasyMock.expect(Boolean.valueOf(countDownLatch.await(EasyMock.eq(this.pollIntervalMs), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andThrow(new CancellationException());
        replayAll();
        newTopicMonitorThread.start();
        Thread.sleep(this.pollIntervalMs);
        verifyAll();
    }

    @Test
    public void testMonitorThreadsWaitThrowsUnknownException() throws Exception {
        final String str = "topic";
        Set singleton = Collections.singleton("topic");
        CountDownLatch countDownLatch = (CountDownLatch) createMock(CountDownLatch.class);
        this.pollIntervalMs = 1000L;
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", singleton, (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 0, false, (List) null, this.adminClient, new RoundRobinAssignor(), new CountDownLatch(1), countDownLatch);
        EasyMock.expect(Long.valueOf(countDownLatch.getCount())).andReturn(1L).anyTimes();
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult).anyTimes();
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture).anyTimes();
        EasyMock.expect(this.listingsFuture.get()).andReturn(Collections.singletonList(new TopicListing("topic", false))).anyTimes();
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult).anyTimes();
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.10
            {
                put(str, TopicMonitorThreadTest.this.descriptionFuture);
            }
        }).anyTimes();
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2))).anyTimes();
        EasyMock.expect(Boolean.valueOf(countDownLatch.await(EasyMock.eq(this.pollIntervalMs), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS)))).andThrow(new IllegalStateException());
        Capture newCapture = EasyMock.newCapture();
        this.context.raiseError((Exception) EasyMock.capture(newCapture));
        EasyMock.expectLastCall();
        replayAll();
        newTopicMonitorThread.start();
        Thread.sleep(this.pollIntervalMs);
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertTrue(newCapture.getValue() instanceof IllegalStateException);
        verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, -1, false, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.11
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        List partitions2 = ((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions();
        Assert.assertTrue(partitions2.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testPartialJointOffsetTranslationTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 1, false, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.12
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions().contains(new TopicPartition("topic", 0)));
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasksUsingLessThanMaxTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 1, false, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.13
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(10);
        Assert.assertEquals(3L, assignments.size());
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions().contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions().contains(new TopicPartition("topic", 1)));
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-2")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasksUsingLessThanMaxOffsetTranslatorTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 2, false, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.14
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(1)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(10);
        Assert.assertEquals(3L, assignments.size());
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions().contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions().contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-2")).partitions().contains(new TopicPartition("__consumer_timestamps", 0)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 1, true, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.15
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 1)));
        List partitions2 = ((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions();
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasksUsingLessThanMaxTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 1, true, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.16
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(2)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(10);
        Assert.assertEquals(3L, assignments.size());
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions().contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions().contains(new TopicPartition("topic", 1)));
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-2")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasksUsingLessThanMaxOffsetTranslatorTasks() throws Exception {
        NewTopicMonitorThread newTopicMonitorThread = new NewTopicMonitorThread(this.context, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), this.pollIntervalMs, 2, true, Collections.singletonList("localhost:9092"), this.adminClient);
        EasyMock.expect(this.adminClient.listTopics()).andReturn(this.listTopicsResult);
        EasyMock.expect(this.listTopicsResult.listings()).andReturn(this.listingsFuture);
        EasyMock.expect(this.listingsFuture.get()).andReturn(Arrays.asList(new TopicListing("topic", false), new TopicListing("__consumer_timestamps", true)));
        EasyMock.expect(this.adminClient.describeTopics((Collection) EasyMock.anyObject(Set.class))).andReturn(this.describeTopicsResult);
        EasyMock.expect(this.describeTopicsResult.values()).andReturn(new HashMap<String, KafkaFuture<TopicDescription>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadTest.17
            {
                put("topic", TopicMonitorThreadTest.this.descriptionFuture);
                put("__consumer_timestamps", TopicMonitorThreadTest.this.otherDescriptionFuture);
            }
        });
        EasyMock.expect(this.descriptionFuture.get()).andReturn(new TopicDescription("topic", false, createPartitionList(2)));
        EasyMock.expect(this.otherDescriptionFuture.get()).andReturn(new TopicDescription("__consumer_timestamps", true, createPartitionList(1)));
        expectShutdown();
        replayAll();
        newTopicMonitorThread.start();
        Map assignments = newTopicMonitorThread.assignments(10);
        Assert.assertEquals(3L, assignments.size());
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions().contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions().contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-2")).partitions().contains(new TopicPartition("__consumer_timestamps", 0)));
        newTopicMonitorThread.shutdown();
        verifyAll();
    }

    private List<TopicPartitionInfo> createPartitionList(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new TopicPartitionInfo(i2, this.nodes.get(0), this.nodes, this.nodes));
        }
        return Collections.unmodifiableList(arrayList);
    }

    private void expectShutdown() {
        this.adminClient.close(EasyMock.eq(0L), (TimeUnit) EasyMock.anyObject(TimeUnit.class));
        EasyMock.expectLastCall();
    }

    private Set<TopicPartition> verifyAndCollectAssignedPartitions(Collection<PartitionAssignor.Assignment> collection) {
        HashSet hashSet = new HashSet();
        Iterator<PartitionAssignor.Assignment> it = collection.iterator();
        while (it.hasNext()) {
            for (TopicPartition topicPartition : it.next().partitions()) {
                Assert.assertFalse(hashSet.contains(topicPartition));
                hashSet.add(topicPartition);
            }
        }
        return hashSet;
    }
}
