/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator;

import io.confluent.connect.replicator.TopicMonitorThreadWithZk;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Test;

public class TopicMonitorThreadWithZkTest
extends EasyMockSupport {
    @Test
    public void testBasicAssignment() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton(topic);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn(Collections.singletonMap(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testEmptyAssignment() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton(topic);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 0, false, null, consumer);
        EasyMock.expect((Object)consumer.listTopics()).andReturn(Collections.emptyMap());
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)0L, (long)assignments.size());
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testNoEmptyTaskAssignments() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton(topic);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn(Collections.singletonMap(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(5);
        Assert.assertEquals((long)2L, (long)assignments.size());
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testRegexAssignment() {
        String matchingTopic = "foo";
        String nonMatchingTopic = "boo";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", Collections.emptySet(), Pattern.compile("f.*"), Collections.emptySet(), 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        HashMap<String, List<PartitionInfo>> topicMetadata = new HashMap<String, List<PartitionInfo>>();
        topicMetadata.put(matchingTopic, Arrays.asList(new PartitionInfo(matchingTopic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(matchingTopic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        topicMetadata.put(nonMatchingTopic, Collections.singletonList(new PartitionInfo(nonMatchingTopic, 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)consumer.listTopics()).andReturn(topicMetadata);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(matchingTopic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(matchingTopic, 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testWhitelistInternalTopic() {
        String topic = "__consumer_offsets";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton(topic);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn(Collections.singletonMap(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testExcludeInternalTopics() {
        String matchingTopic = "foo";
        String internalTopic = "__consumer_offsets";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        HashMap<String, List<PartitionInfo>> topicMetadata = new HashMap<String, List<PartitionInfo>>();
        topicMetadata.put(matchingTopic, Arrays.asList(new PartitionInfo(matchingTopic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(matchingTopic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        topicMetadata.put(internalTopic, Collections.singletonList(new PartitionInfo(internalTopic, 0, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)consumer.listTopics()).andReturn(topicMetadata);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(matchingTopic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(matchingTopic, 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testBlacklistTopicsFiltered() {
        String topic = "topic";
        String blackListTopic = "blacklisted";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> blackListTopics = Collections.singleton(blackListTopic);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", Collections.emptySet(), Pattern.compile(".*"), blackListTopics, 10000, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        HashMap<String, List<PartitionInfo>> topicMetadata = new HashMap<String, List<PartitionInfo>>();
        topicMetadata.put(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        topicMetadata.put(blackListTopic, Arrays.asList(new PartitionInfo(blackListTopic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(blackListTopic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)consumer.listTopics()).andReturn(topicMetadata);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testMetadataChangeTriggersReconfiguration() throws Exception {
        String topic = "topic";
        String otherTopic = "other";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), 100, 0, false, null, consumer);
        Node leader = new Node(0, "localhost", 9092);
        HashMap<String, List<PartitionInfo>> topicMetadata = new HashMap<String, List<PartitionInfo>>();
        topicMetadata.put(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        HashMap<String, List<PartitionInfo>> refreshedTopicMetadata = new HashMap<String, List<PartitionInfo>>();
        refreshedTopicMetadata.put(topic, Arrays.asList(new PartitionInfo(topic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(topic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        refreshedTopicMetadata.put(otherTopic, Arrays.asList(new PartitionInfo(otherTopic, 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo(otherTopic, 1, leader, new Node[]{leader}, new Node[]{leader})));
        EasyMock.expect((Object)consumer.listTopics()).andReturn(topicMetadata).andStubReturn(refreshedTopicMetadata);
        context.requestTaskReconfiguration();
        EasyMock.expectLastCall();
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        Set<TopicPartition> partitions = this.verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals((long)2L, (long)partitions.size());
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition(topic, 1)));
        Thread.sleep(200L);
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasks() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, -1, false, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("__consumer_timestamps", 1, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testPartialJointOffsetTranslationTasks() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 1, false, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("__consumer_timestamps", 1, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasksUsingLessThanMaxTasks() throws Exception {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 1, false, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("__consumer_timestamps", 1, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(10);
        Assert.assertEquals((long)3L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-2")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasksUsingLessThanMaxOffsetTranslatorTasks() throws Exception {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 2, false, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(10);
        Assert.assertEquals((long)3L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-2")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasks() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 1, true, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("__consumer_timestamps", 1, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(2);
        Assert.assertEquals((long)2L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasksUsingLessThanMaxTasks() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 1, true, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("__consumer_timestamps", 1, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(10);
        Assert.assertEquals((long)3L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-2")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasksUsingLessThanMaxOffsetTranslatorTasks() {
        String topic = "topic";
        ConnectorContext context = (ConnectorContext)this.createMock(ConnectorContext.class);
        Consumer consumer = (Consumer)this.createMock(Consumer.class);
        Set<String> topics = Collections.singleton("topic");
        TopicMonitorThreadWithZk monitorThread = new TopicMonitorThreadWithZk(context, "connector", topics, null, Collections.emptySet(), 10000, 2, true, Collections.singletonList("localhost:9092"), consumer);
        final Node leader = new Node(0, "localhost", 9092);
        EasyMock.expect((Object)consumer.listTopics()).andReturn((Object)new HashMap<String, List<PartitionInfo>>(){
            {
                this.put("topic", Arrays.asList(new PartitionInfo("topic", 0, leader, new Node[]{leader}, new Node[]{leader}), new PartitionInfo("topic", 1, leader, new Node[]{leader}, new Node[]{leader})));
                this.put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, leader, new Node[]{leader}, new Node[]{leader})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        this.replayAll();
        monitorThread.start();
        Map assignments = monitorThread.assignments(10);
        Assert.assertEquals((long)3L, (long)assignments.size());
        List partitions = ((PartitionAssignor.Assignment)assignments.get("connector-0")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 0)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-1")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("topic", 1)));
        partitions = ((PartitionAssignor.Assignment)assignments.get("connector-2")).partitions();
        Assert.assertTrue((boolean)partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        monitorThread.shutdown();
        this.verifyAll();
    }

    private Set<TopicPartition> verifyAndCollectAssignedPartitions(Collection<PartitionAssignor.Assignment> assignments) {
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        for (PartitionAssignor.Assignment assignment : assignments) {
            for (TopicPartition partition : assignment.partitions()) {
                Assert.assertFalse((boolean)partitions.contains(partition));
                partitions.add(partition);
            }
        }
        return partitions;
    }
}

