package io.confluent.connect.replicator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/replicator/TopicMonitorThreadWithZkTest.class */
public class TopicMonitorThreadWithZkTest extends EasyMockSupport {
    @Test
    public void testBasicAssignment() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(Collections.singletonMap("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testEmptyAssignment() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        EasyMock.expect(consumer.listTopics()).andReturn(Collections.emptyMap());
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Assert.assertEquals(0L, topicMonitorThreadWithZk.assignments(2).size());
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testNoEmptyTaskAssignments() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(Collections.singletonMap("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Assert.assertEquals(2L, topicMonitorThreadWithZk.assignments(5).size());
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testRegexAssignment() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.emptySet(), Pattern.compile("f.*"), Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("foo", Arrays.asList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})));
        hashMap.put("boo", Collections.singletonList(new PartitionInfo("boo", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(consumer.listTopics()).andReturn(hashMap);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testWhitelistInternalTopic() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("__consumer_offsets"), (Pattern) null, Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(Collections.singletonMap("__consumer_offsets", Arrays.asList(new PartitionInfo("__consumer_offsets", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("__consumer_offsets", 1, node, new Node[]{node}, new Node[]{node}))));
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("__consumer_offsets", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("__consumer_offsets", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testExcludeInternalTopics() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("foo", Arrays.asList(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})));
        hashMap.put("__consumer_offsets", Collections.singletonList(new PartitionInfo("__consumer_offsets", 0, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(consumer.listTopics()).andReturn(hashMap);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("foo", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testBlacklistTopicsFiltered() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.singleton("blacklisted"), 10000, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
        hashMap.put("blacklisted", Arrays.asList(new PartitionInfo("blacklisted", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("blacklisted", 1, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(consumer.listTopics()).andReturn(hashMap);
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testMetadataChangeTriggersReconfiguration() throws Exception {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.emptySet(), Pattern.compile(".*"), Collections.emptySet(), 100, 0, false, (List) null, consumer);
        Node node = new Node(0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
        hashMap2.put("other", Arrays.asList(new PartitionInfo("other", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("other", 1, node, new Node[]{node}, new Node[]{node})));
        EasyMock.expect(consumer.listTopics()).andReturn(hashMap).andStubReturn(hashMap2);
        connectorContext.requestTaskReconfiguration();
        EasyMock.expectLastCall();
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        Set<TopicPartition> verifyAndCollectAssignedPartitions = verifyAndCollectAssignedPartitions(assignments.values());
        Assert.assertEquals(2L, verifyAndCollectAssignedPartitions.size());
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(verifyAndCollectAssignedPartitions.contains(new TopicPartition("topic", 1)));
        Thread.sleep(200L);
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testJointOffsetTranslationTasks() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, -1, false, Collections.singletonList("localhost:9092"), consumer);
        final Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(new HashMap<String, List<PartitionInfo>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadWithZkTest.1
            {
                put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
                put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("__consumer_timestamps", 1, node, new Node[]{node}, new Node[]{node})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        List partitions2 = ((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions();
        Assert.assertTrue(partitions2.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testPartialJointOffsetTranslationTasks() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, 1, false, Collections.singletonList("localhost:9092"), consumer);
        final Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(new HashMap<String, List<PartitionInfo>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadWithZkTest.2
            {
                put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
                put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("__consumer_timestamps", 1, node, new Node[]{node}, new Node[]{node})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 1)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("__consumer_timestamps", 1)));
        Assert.assertTrue(((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions().contains(new TopicPartition("topic", 0)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    @Test
    public void testSeparateOffsetTranslationTasks() {
        ConnectorContext connectorContext = (ConnectorContext) createMock(ConnectorContext.class);
        Consumer consumer = (Consumer) createMock(Consumer.class);
        TopicMonitorThreadWithZk topicMonitorThreadWithZk = new TopicMonitorThreadWithZk(connectorContext, "connector", Collections.singleton("topic"), (Pattern) null, Collections.emptySet(), 10000, 1, true, Collections.singletonList("localhost:9092"), consumer);
        final Node node = new Node(0, "localhost", 9092);
        EasyMock.expect(consumer.listTopics()).andReturn(new HashMap<String, List<PartitionInfo>>() { // from class: io.confluent.connect.replicator.TopicMonitorThreadWithZkTest.3
            {
                put("topic", Arrays.asList(new PartitionInfo("topic", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("topic", 1, node, new Node[]{node}, new Node[]{node})));
                put("__consumer_timestamps", Arrays.asList(new PartitionInfo("__consumer_timestamps", 0, node, new Node[]{node}, new Node[]{node}), new PartitionInfo("__consumer_timestamps", 1, node, new Node[]{node}, new Node[]{node})));
            }
        });
        consumer.wakeup();
        EasyMock.expectLastCall();
        consumer.close();
        EasyMock.expectLastCall();
        replayAll();
        topicMonitorThreadWithZk.start();
        Map assignments = topicMonitorThreadWithZk.assignments(2);
        Assert.assertEquals(2L, assignments.size());
        List partitions = ((PartitionAssignor.Assignment) assignments.get("connector-0")).partitions();
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 0)));
        Assert.assertTrue(partitions.contains(new TopicPartition("topic", 1)));
        List partitions2 = ((PartitionAssignor.Assignment) assignments.get("connector-1")).partitions();
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 0)));
        Assert.assertTrue(partitions2.contains(new TopicPartition("__consumer_timestamps", 1)));
        topicMonitorThreadWithZk.shutdown();
        verifyAll();
    }

    private Set<TopicPartition> verifyAndCollectAssignedPartitions(Collection<PartitionAssignor.Assignment> collection) {
        HashSet hashSet = new HashSet();
        Iterator<PartitionAssignor.Assignment> it = collection.iterator();
        while (it.hasNext()) {
            for (TopicPartition topicPartition : it.next().partitions()) {
                Assert.assertFalse(hashSet.contains(topicPartition));
                hashSet.add(topicPartition);
            }
        }
        return hashSet;
    }
}
