package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.class */
public class MirrorSourceConnectorTest {
    @Test
    public void testReplicatesHeartbeatsByDefault() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
    }

    @Test
    public void testReplicatesHeartbeatsDespiteFilter() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return false;
        }, new DefaultConfigPropertyFilter());
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
    }

    @Test
    public void testNoCycles() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, str2 -> {
            return true;
        });
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("topic1"), "should allow anything else");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.topic1"), "should allow anything else");
    }

    @Test
    public void testIdentityReplication() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new IdentityReplicationPolicy(), str -> {
            return true;
        }, str2 -> {
            return true;
        });
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.source.target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.target.source.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("topic1"), "should allow normal topics");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("othersource.topic1"), "should allow normal topics");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.source.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should allow heartbeat topics");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("othersource.heartbeats"), "should allow heartbeat topics");
    }

    @Test
    public void testAclFiltering() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, str2 -> {
            return true;
        });
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
    }

    @Test
    public void testAclTransformation() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, str2 -> {
            return true;
        });
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding targetAclBinding = mirrorSourceConnector.targetAclBinding(aclBinding);
        Assertions.assertEquals("source." + aclBinding.pattern().name(), targetAclBinding.pattern().name(), "should change topic name");
        Assertions.assertEquals(targetAclBinding.entry().operation(), AclOperation.READ, "should change ALL to READ");
        Assertions.assertEquals(targetAclBinding.entry().permissionType(), AclPermissionType.ALLOW, "should not change ALLOW");
        AclBinding targetAclBinding2 = mirrorSourceConnector.targetAclBinding(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY)));
        Assertions.assertEquals(targetAclBinding2.entry().operation(), AclOperation.ALL, "should not change ALL");
        Assertions.assertEquals(targetAclBinding2.entry().permissionType(), AclPermissionType.DENY, "should not change DENY");
    }

    @Test
    public void testConfigPropertyFiltering() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, new DefaultConfigPropertyFilter());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        Config targetConfig = mirrorSourceConnector.targetConfig(new Config(arrayList));
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry -> {
            return configEntry.name().equals("name-1");
        }), "should replicate properties");
        Assertions.assertFalse(targetConfig.entries().stream().anyMatch(configEntry2 -> {
            return configEntry2.name().equals("min.insync.replicas");
        }), "should not replicate excluded properties");
    }

    @Test
    public void testNewTopicConfigs() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("config.properties.exclude", "follower\\.replication\\.throttled\\.replicas, leader\\.replication\\.throttled\\.replicas, message\\.timestamp\\.difference\\.max\\.ms, message\\.timestamp\\.type, unclean\\.leader\\.election\\.enable, min\\.insync\\.replicas,exclude_param.*");
        DefaultConfigPropertyFilter defaultConfigPropertyFilter = new DefaultConfigPropertyFilter();
        defaultConfigPropertyFilter.configure(hashMap);
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, defaultConfigPropertyFilter));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("exclude_param.param1", "value-param1"));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", new Config(arrayList))).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((MirrorSourceConnector) Mockito.doAnswer(invocationOnMock -> {
            Map map = (Map) invocationOnMock.getArgument(0);
            Assertions.assertNotNull(map.get("source.testtopic"));
            Map configs = ((NewTopic) map.get("source.testtopic")).configs();
            Assertions.assertNotNull(configs.get("name-1"), "should replicate properties");
            Assertions.assertNull(configs.get("min.insync.replicas"), "should not replicate excluded properties min.insync.replicas");
            Assertions.assertNull(configs.get("exclude_param.param1"), "should not replicate excluded properties exclude_param.param1");
            return null;
        }).when(mirrorSourceConnector)).createNewTopics((Map) Mockito.any());
        mirrorSourceConnector.createNewTopics(Collections.singleton("testtopic"), Collections.singletonMap("testtopic", 1L));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector)).createNewTopics((Set) Mockito.any(), (Map) Mockito.any());
    }

    @Test
    public void testMirrorSourceConnectorTaskConfig() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("t0", 0));
        arrayList.add(new TopicPartition("t0", 1));
        arrayList.add(new TopicPartition("t0", 2));
        arrayList.add(new TopicPartition("t0", 3));
        arrayList.add(new TopicPartition("t0", 4));
        arrayList.add(new TopicPartition("t0", 5));
        arrayList.add(new TopicPartition("t0", 6));
        arrayList.add(new TopicPartition("t0", 7));
        arrayList.add(new TopicPartition("t1", 0));
        arrayList.add(new TopicPartition("t1", 1));
        arrayList.add(new TopicPartition("t2", 0));
        arrayList.add(new TopicPartition("t2", 1));
        List taskConfigs = new MirrorSourceConnector(arrayList, new MirrorSourceConfig(TestUtils.makeProps(new String[0]))).taskConfigs(3);
        Assertions.assertEquals("t0-0,t0-3,t0-6,t1-1", ((Map) taskConfigs.get(0)).get("task.assigned.partitions"), "Config for t1 is incorrect");
        Assertions.assertEquals("t0-1,t0-4,t0-7,t2-0", ((Map) taskConfigs.get(1)).get("task.assigned.partitions"), "Config for t2 is incorrect");
        Assertions.assertEquals("t0-2,t0-5,t1-0,t2-1", ((Map) taskConfigs.get(2)).get("task.assigned.partitions"), "Config for t3 is incorrect");
    }

    @Test
    public void testRefreshTopicPartitions() throws Exception {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        mirrorSourceConnector.initialize((ConnectorContext) Mockito.mock(ConnectorContext.class));
        MirrorSourceConnector mirrorSourceConnector2 = (MirrorSourceConnector) Mockito.spy(mirrorSourceConnector);
        Config config = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100")));
        Map singletonMap = Collections.singletonMap("topic", config);
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("topic", 0))).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(Collections.emptyList()).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonMap).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.singleton("topic"));
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewTopics((Map) Mockito.any());
        mirrorSourceConnector2.refreshTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        new HashMap().put("source.topic", 1L);
        Map configToMap = MirrorSourceConnector.configToMap(config);
        Assertions.assertEquals(2, configToMap.size(), "configMap has incorrect size");
        HashMap hashMap = new HashMap();
        hashMap.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configToMap));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).computeAndCreateTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).createNewTopics((Map) Mockito.eq(hashMap));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(0))).createNewPartitions((Map) Mockito.any());
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("source.topic", 0))).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        mirrorSourceConnector.initialize((ConnectorContext) Mockito.mock(ConnectorContext.class));
        MirrorSourceConnector mirrorSourceConnector2 = (MirrorSourceConnector) Mockito.spy(mirrorSourceConnector);
        Map singletonMap = Collections.singletonMap("source.topic", new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100"))));
        List emptyList = Collections.emptyList();
        List singletonList = Collections.singletonList(new TopicPartition("source.topic", 0));
        ((MirrorSourceConnector) Mockito.doReturn(emptyList).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonList).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonMap).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.singleton("source.topic"));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.emptyMap()).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.emptySet());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewTopics((Map) Mockito.any());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewPartitions((Map) Mockito.any());
        mirrorSourceConnector2.refreshTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(0))).computeAndCreateTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("topic", 0))).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(1))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testIsCycleWithNullUpstreamTopic() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy() { // from class: org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.1CustomReplicationPolicy
            public String upstreamTopic(String str) {
                return null;
            }
        }, new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        Assertions.assertDoesNotThrow(() -> {
            return Boolean.valueOf(mirrorSourceConnector.isCycle(".b"));
        });
    }
}
