package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.log4j.Level;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.class */
public class MirrorSourceConnectorTest {
    private ConfigPropertyFilter getConfigPropertyFilter() {
        return str -> {
            return true;
        };
    }

    @Test
    public void testReplicatesHeartbeatsByDefault() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
    }

    @Test
    public void testReplicatesHeartbeatsDespiteFilter() {
        DefaultReplicationPolicy defaultReplicationPolicy = new DefaultReplicationPolicy();
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), defaultReplicationPolicy, str -> {
            return false;
        }, new DefaultConfigPropertyFilter());
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
        defaultReplicationPolicy.configure(Collections.singletonMap("replication.policy.separator", "_"));
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("us-west.heartbeats"), "should not consider this topic as a heartbeats topic");
    }

    @Test
    public void testNoCycles() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, getConfigPropertyFilter());
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("topic1"), "should allow anything else");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.topic1"), "should allow anything else");
    }

    @Test
    public void testIdentityReplication() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new IdentityReplicationPolicy(), str -> {
            return true;
        }, getConfigPropertyFilter());
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("target.source.target.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("source.target.source.topic1"), "should allow cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("topic1"), "should allow normal topics");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("othersource.topic1"), "should allow normal topics");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("target.source.target.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateTopic("source.target.source.heartbeats"), "should not allow heartbeat cycles");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("heartbeats"), "should allow heartbeat topics");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateTopic("othersource.heartbeats"), "should allow heartbeat topics");
    }

    @Test
    public void testAclFiltering() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, getConfigPropertyFilter());
        Assertions.assertFalse(mirrorSourceConnector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
        Assertions.assertTrue(mirrorSourceConnector.shouldReplicateAcl(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
    }

    @Test
    public void testAclTransformation() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, getConfigPropertyFilter());
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding targetAclBinding = mirrorSourceConnector.targetAclBinding(aclBinding);
        Assertions.assertEquals("source." + aclBinding.pattern().name(), targetAclBinding.pattern().name(), "should change topic name");
        Assertions.assertEquals(targetAclBinding.entry().operation(), AclOperation.READ, "should change ALL to READ");
        Assertions.assertEquals(targetAclBinding.entry().permissionType(), AclPermissionType.ALLOW, "should not change ALLOW");
        AclBinding targetAclBinding2 = mirrorSourceConnector.targetAclBinding(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY)));
        Assertions.assertEquals(targetAclBinding2.entry().operation(), AclOperation.ALL, "should not change ALL");
        Assertions.assertEquals(targetAclBinding2.entry().permissionType(), AclPermissionType.DENY, "should not change DENY");
    }

    @Test
    public void testNoBrokerAclAuthorizer() throws Exception {
        Admin admin = (Admin) Mockito.mock(Admin.class);
        Admin admin2 = (Admin) Mockito.mock(Admin.class);
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(admin, admin2, new MirrorSourceConfig(TestUtils.makeProps(new String[0])));
        ExecutionException executionException = new ExecutionException("Failed to describe ACLs", new SecurityDisabledException("No ACL authorizer configured on this broker"));
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(kafkaFuture.get()).thenThrow(new Throwable[]{executionException});
        DescribeAclsResult describeAclsResult = (DescribeAclsResult) Mockito.mock(DescribeAclsResult.class);
        Mockito.when(describeAclsResult.values()).thenReturn(kafkaFuture);
        Mockito.when(admin.describeAcls((AclBindingFilter) Mockito.any())).thenReturn(describeAclsResult);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class);
        Throwable th = null;
        try {
            try {
                createAndRegister.setClassLogger(MirrorSourceConnector.class, Level.TRACE);
                mirrorSourceConnector.syncTopicAcls();
                Assertions.assertEquals(1L, createAndRegister.getMessages().stream().filter(str -> {
                    return str.contains("Consider disabling topic ACL syncing");
                }).count(), "Should have recommended that user disable ACL syncing");
                Assertions.assertEquals(0L, createAndRegister.getMessages().stream().filter(str2 -> {
                    return str2.contains("skipping topic ACL sync");
                }).count(), "Should not have logged ACL sync skip at same time as suggesting ACL sync be disabled");
                mirrorSourceConnector.syncTopicAcls();
                mirrorSourceConnector.syncTopicAcls();
                Assertions.assertEquals(1L, createAndRegister.getMessages().stream().filter(str3 -> {
                    return str3.contains("Consider disabling topic ACL syncing");
                }).count(), "Should not have recommended that user disable ACL syncing more than once");
                Assertions.assertEquals(2L, createAndRegister.getMessages().stream().filter(str4 -> {
                    return str4.contains("skipping topic ACL sync");
                }).count(), "Should have logged ACL sync skip instead of suggesting disabling ACL syncing");
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                Mockito.verifyNoInteractions(new Object[]{admin2});
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMissingDescribeConfigsAcl() throws Exception {
        Admin admin = (Admin) Mockito.mock(Admin.class);
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(admin, (Admin) Mockito.mock(Admin.class), new MirrorSourceConfig(TestUtils.makeProps(new String[0])));
        ExecutionException executionException = new ExecutionException("Failed to describe topic configs", new TopicAuthorizationException("Topic authorization failed"));
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(kafkaFuture.get()).thenThrow(new Throwable[]{executionException});
        DescribeConfigsResult describeConfigsResult = (DescribeConfigsResult) Mockito.mock(DescribeConfigsResult.class);
        Mockito.when(describeConfigsResult.all()).thenReturn(kafkaFuture);
        Mockito.when(admin.describeConfigs((Collection) Mockito.any())).thenReturn(describeConfigsResult);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(MirrorUtils.class);
        Throwable th = null;
        try {
            try {
                createAndRegister.setClassLogger(MirrorUtils.class, Level.TRACE);
                HashSet hashSet = new HashSet();
                hashSet.add("topic1");
                hashSet.add("topic2");
                Assertions.assertEquals(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    mirrorSourceConnector.describeTopicConfigs(hashSet);
                })).getCause().getClass().getSimpleName() + " occurred while trying to describe configs for topics [topic1, topic2] on source1 cluster", createAndRegister.getMessages().get(0));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testConfigPropertyFiltering() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, new DefaultConfigPropertyFilter());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        Config targetConfig = mirrorSourceConnector.targetConfig(new Config(arrayList), true);
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry -> {
            return configEntry.name().equals("name-1");
        }), "should replicate properties");
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry2 -> {
            return configEntry2.name().equals("name-2");
        }), "should include default properties");
        Assertions.assertFalse(targetConfig.entries().stream().anyMatch(configEntry3 -> {
            return configEntry3.name().equals("min.insync.replicas");
        }), "should not replicate excluded properties");
    }

    @Test
    @Deprecated
    public void testConfigPropertyFilteringWithAlterConfigs() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, new DefaultConfigPropertyFilter());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        Config targetConfig = mirrorSourceConnector.targetConfig(new Config(arrayList), false);
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry -> {
            return configEntry.name().equals("name-1");
        }), "should replicate properties");
        Assertions.assertFalse(targetConfig.entries().stream().anyMatch(configEntry2 -> {
            return configEntry2.name().equals("name-2");
        }), "should not replicate default properties");
        Assertions.assertFalse(targetConfig.entries().stream().anyMatch(configEntry3 -> {
            return configEntry3.name().equals("min.insync.replicas");
        }), "should not replicate excluded properties");
    }

    @Test
    @Deprecated
    public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() {
        Map singletonMap = Collections.singletonMap("use.defaults.from", "source");
        DefaultConfigPropertyFilter defaultConfigPropertyFilter = new DefaultConfigPropertyFilter();
        defaultConfigPropertyFilter.configure(singletonMap);
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, defaultConfigPropertyFilter);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        Config targetConfig = mirrorSourceConnector.targetConfig(new Config(arrayList), false);
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry -> {
            return configEntry.name().equals("name-1");
        }), "should replicate properties");
        Assertions.assertTrue(targetConfig.entries().stream().anyMatch(configEntry2 -> {
            return configEntry2.name().equals("name-2");
        }), "should include default properties");
        Assertions.assertFalse(targetConfig.entries().stream().anyMatch(configEntry3 -> {
            return configEntry3.name().equals("min.insync.replicas");
        }), "should not replicate excluded properties");
    }

    @Test
    public void testNewTopicConfigs() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("config.properties.exclude", "follower\\.replication\\.throttled\\.replicas, leader\\.replication\\.throttled\\.replicas, message\\.timestamp\\.difference\\.max\\.ms, message\\.timestamp\\.type, unclean\\.leader\\.election\\.enable, min\\.insync\\.replicas,exclude_param.*");
        DefaultConfigPropertyFilter defaultConfigPropertyFilter = new DefaultConfigPropertyFilter();
        defaultConfigPropertyFilter.configure(hashMap);
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), str -> {
            return true;
        }, defaultConfigPropertyFilter));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConfigEntry("name-1", "value-1"));
        arrayList.add(new ConfigEntry("exclude_param.param1", "value-param1"));
        arrayList.add(new ConfigEntry("min.insync.replicas", "2"));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", new Config(arrayList))).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((MirrorSourceConnector) Mockito.doAnswer(invocationOnMock -> {
            Map map = (Map) invocationOnMock.getArgument(0);
            Assertions.assertNotNull(map.get("source.testtopic"));
            Map configs = ((NewTopic) map.get("source.testtopic")).configs();
            Assertions.assertNotNull(configs.get("name-1"), "should replicate properties");
            Assertions.assertNull(configs.get("min.insync.replicas"), "should not replicate excluded properties min.insync.replicas");
            Assertions.assertNull(configs.get("exclude_param.param1"), "should not replicate excluded properties exclude_param.param1");
            return null;
        }).when(mirrorSourceConnector)).createNewTopics((Map) Mockito.any());
        mirrorSourceConnector.createNewTopics(Collections.singleton("testtopic"), Collections.singletonMap("testtopic", 1L));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector)).createNewTopics((Set) Mockito.any(), (Map) Mockito.any());
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequested() throws Exception {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        makeProps.put("use.incremental.alter.configs", "requested");
        MirrorSourceConfig mirrorSourceConfig = new MirrorSourceConfig(makeProps);
        Admin admin = (Admin) Mockito.mock(Admin.class);
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), mirrorSourceConfig, new DefaultConfigPropertyFilter(), admin));
        Config config = new Config(Collections.singletonList(new ConfigEntry("name-1", "value-1")));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", config)).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((Admin) Mockito.doReturn(AdminClientTestUtils.alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, "testtopic"), new UnsupportedVersionException("Unsupported API"))).when(admin)).incrementalAlterConfigs((Map) Mockito.any());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector)).deprecatedAlterConfigs((Map) Mockito.any());
        mirrorSourceConnector.syncTopicConfigs();
        Map singletonMap = Collections.singletonMap("source.testtopic", config);
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector)).incrementalAlterConfigs(singletonMap);
        mirrorSourceConnector.syncTopicConfigs();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector, Mockito.times(1))).deprecatedAlterConfigs(singletonMap);
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequired() throws Exception {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        makeProps.put("use.incremental.alter.configs", "required");
        MirrorSourceConfig mirrorSourceConfig = new MirrorSourceConfig(makeProps);
        Admin admin = (Admin) Mockito.mock(Admin.class);
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), mirrorSourceConfig, new DefaultConfigPropertyFilter(), admin));
        ArrayList arrayList = new ArrayList();
        ConfigEntry configEntry = new ConfigEntry("name-1", "value-1");
        ConfigEntry configEntry2 = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
        arrayList.add(configEntry);
        arrayList.add(configEntry2);
        Config config = new Config(arrayList);
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", config)).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((Admin) Mockito.doAnswer(invocationOnMock -> {
            Map map = (Map) invocationOnMock.getArgument(0);
            Assertions.assertNotNull(map);
            Assertions.assertEquals(1, map.size());
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source.testtopic");
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
            arrayList2.add(new AlterConfigOp(configEntry2, AlterConfigOp.OpType.DELETE));
            Assertions.assertEquals(arrayList2, map.get(configResource));
            return AdminClientTestUtils.alterConfigsResult(configResource);
        }).when(admin)).incrementalAlterConfigs((Map) Mockito.any());
        mirrorSourceConnector.syncTopicConfigs();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector)).incrementalAlterConfigs(Collections.singletonMap("source.testtopic", config));
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        makeProps.put("use.incremental.alter.configs", "required");
        MirrorSourceConfig mirrorSourceConfig = new MirrorSourceConfig(makeProps);
        Admin admin = (Admin) Mockito.mock(Admin.class);
        ConnectorContext connectorContext = (ConnectorContext) Mockito.mock(ConnectorContext.class);
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), mirrorSourceConfig, new DefaultConfigPropertyFilter(), admin));
        mirrorSourceConnector.initialize(connectorContext);
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", new Config(Collections.singletonList(new ConfigEntry("name-1", "value-1"))))).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((Admin) Mockito.doReturn(AdminClientTestUtils.alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, "testtopic"), new UnsupportedVersionException("Unsupported API"))).when(admin)).incrementalAlterConfigs((Map) Mockito.any());
        mirrorSourceConnector.syncTopicConfigs();
        ((ConnectorContext) Mockito.verify(connectorContext)).raiseError((Exception) ArgumentMatchers.isA(ConnectException.class));
    }

    @Test
    @Deprecated
    public void testIncrementalAlterConfigsNeverUsed() throws Exception {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        makeProps.put("use.incremental.alter.configs", "never");
        MirrorSourceConnector mirrorSourceConnector = (MirrorSourceConnector) Mockito.spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new MirrorSourceConfig(makeProps), new DefaultConfigPropertyFilter(), (Admin) null));
        Config config = new Config(Collections.singletonList(new ConfigEntry("name-1", "value-1")));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonMap("testtopic", config)).when(mirrorSourceConnector)).describeTopicConfigs((Set) Mockito.any());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector)).deprecatedAlterConfigs((Map) Mockito.any());
        mirrorSourceConnector.syncTopicConfigs();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector)).deprecatedAlterConfigs(Collections.singletonMap("source.testtopic", config));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector, Mockito.never())).incrementalAlterConfigs((Map) Mockito.any());
    }

    @Test
    public void testMirrorSourceConnectorTaskConfig() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("t0", 0));
        arrayList.add(new TopicPartition("t0", 1));
        arrayList.add(new TopicPartition("t0", 2));
        arrayList.add(new TopicPartition("t0", 3));
        arrayList.add(new TopicPartition("t0", 4));
        arrayList.add(new TopicPartition("t0", 5));
        arrayList.add(new TopicPartition("t0", 6));
        arrayList.add(new TopicPartition("t0", 7));
        arrayList.add(new TopicPartition("t1", 0));
        arrayList.add(new TopicPartition("t1", 1));
        arrayList.add(new TopicPartition("t2", 0));
        arrayList.add(new TopicPartition("t2", 1));
        List taskConfigs = new MirrorSourceConnector(arrayList, new MirrorSourceConfig(TestUtils.makeProps(new String[0]))).taskConfigs(3);
        Assertions.assertEquals("t0-0,t0-3,t0-6,t1-1", ((Map) taskConfigs.get(0)).get("task.assigned.partitions"), "Config for t1 is incorrect");
        Assertions.assertEquals("t0-1,t0-4,t0-7,t2-0", ((Map) taskConfigs.get(1)).get("task.assigned.partitions"), "Config for t2 is incorrect");
        Assertions.assertEquals("t0-2,t0-5,t1-0,t2-1", ((Map) taskConfigs.get(2)).get("task.assigned.partitions"), "Config for t3 is incorrect");
    }

    @Test
    public void testRefreshTopicPartitions() throws Exception {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        mirrorSourceConnector.initialize((ConnectorContext) Mockito.mock(ConnectorContext.class));
        MirrorSourceConnector mirrorSourceConnector2 = (MirrorSourceConnector) Mockito.spy(mirrorSourceConnector);
        Config config = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100")));
        Map singletonMap = Collections.singletonMap("topic", config);
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("topic", 0))).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(Collections.emptyList()).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonMap).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.singleton("topic"));
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewTopics((Map) Mockito.any());
        mirrorSourceConnector2.refreshTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        new HashMap().put("source.topic", 1L);
        Map configToMap = MirrorSourceConnector.configToMap(config);
        Assertions.assertEquals(2, configToMap.size(), "configMap has incorrect size");
        HashMap hashMap = new HashMap();
        hashMap.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configToMap));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).computeAndCreateTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).createNewTopics((Map) Mockito.eq(hashMap));
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(0))).createNewPartitions((Map) Mockito.any());
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("source.topic", 0))).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(2))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        mirrorSourceConnector.initialize((ConnectorContext) Mockito.mock(ConnectorContext.class));
        MirrorSourceConnector mirrorSourceConnector2 = (MirrorSourceConnector) Mockito.spy(mirrorSourceConnector);
        Map singletonMap = Collections.singletonMap("source.topic", new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact"), new ConfigEntry("segment.bytes", "100"))));
        List emptyList = Collections.emptyList();
        List singletonList = Collections.singletonList(new TopicPartition("source.topic", 0));
        ((MirrorSourceConnector) Mockito.doReturn(emptyList).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonList).when(mirrorSourceConnector2)).findTargetTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(singletonMap).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.singleton("source.topic"));
        ((MirrorSourceConnector) Mockito.doReturn(Collections.emptyMap()).when(mirrorSourceConnector2)).describeTopicConfigs(Collections.emptySet());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewTopics((Map) Mockito.any());
        ((MirrorSourceConnector) Mockito.doNothing().when(mirrorSourceConnector2)).createNewPartitions((Map) Mockito.any());
        mirrorSourceConnector2.refreshTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(0))).computeAndCreateTopicPartitions();
        ((MirrorSourceConnector) Mockito.doReturn(Collections.singletonList(new TopicPartition("topic", 0))).when(mirrorSourceConnector2)).findSourceTopicPartitions();
        mirrorSourceConnector2.refreshTopicPartitions();
        ((MirrorSourceConnector) Mockito.verify(mirrorSourceConnector2, Mockito.times(1))).computeAndCreateTopicPartitions();
    }

    @Test
    public void testIsCycleWithNullUpstreamTopic() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), new DefaultReplicationPolicy() { // from class: org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.1CustomReplicationPolicy
            public String upstreamTopic(String str) {
                return null;
            }
        }, new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
        Assertions.assertDoesNotThrow(() -> {
            return Boolean.valueOf(mirrorSourceConnector.isCycle(".b"));
        });
    }

    @Test
    public void testExactlyOnceSupport() {
        assertExactlyOnceSupport(null, null, false);
        assertExactlyOnceSupport("read_uncommitted", null, false);
        assertExactlyOnceSupport(null, "read_uncommitted", false);
        assertExactlyOnceSupport("read_uncommitted", "read_uncommitted", false);
        assertExactlyOnceSupport("read_committed", null, true);
        assertExactlyOnceSupport(null, "read_committed", true);
        assertExactlyOnceSupport("read_uncommitted", "read_committed", true);
        assertExactlyOnceSupport("read_committed", "read_committed", true);
        assertExactlyOnceSupport("read_garbage", null, false);
        assertExactlyOnceSupport(null, "read_garbage", false);
        assertExactlyOnceSupport("read_garbage", "read_garbage", false);
        assertExactlyOnceSupport("read_committed", "read_garbage", false);
        assertExactlyOnceSupport("read_uncommitted", "read_garbage", false);
        assertExactlyOnceSupport("read_garbage", "read_uncommitted", false);
        assertExactlyOnceSupport("read_garbage", "read_committed", true);
    }

    private void assertExactlyOnceSupport(String str, String str2, boolean z) {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        if (str != null) {
            makeProps.put("consumer.isolation.level", str);
        }
        if (str2 != null) {
            makeProps.put("source.consumer.isolation.level", str2);
        }
        Assertions.assertEquals(z ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED, new MirrorSourceConnector().exactlyOnceSupport(makeProps));
    }

    @Test
    public void testExactlyOnceSupportValidation() {
        Map<String, String> makeProps = TestUtils.makeProps(new String[0]);
        Assertions.assertEquals(Optional.empty(), validateProperty("exactly.once.support", makeProps));
        makeProps.put("exactly.once.support", "requested");
        Assertions.assertEquals(Optional.empty(), validateProperty("exactly.once.support", makeProps));
        makeProps.put("exactly.once.support", "garbage");
        Assertions.assertEquals(Optional.empty(), validateProperty("exactly.once.support", makeProps));
        makeProps.put("exactly.once.support", "required");
        Optional<ConfigValue> validateProperty = validateProperty("exactly.once.support", makeProps);
        Assertions.assertTrue(validateProperty.isPresent());
        List errorMessages = validateProperty.get().errorMessages();
        Assertions.assertEquals(1, errorMessages.size());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("isolation.level"), "Error message \"" + ((String) errorMessages.get(0)) + "\" should have mentioned the 'isolation.level' consumer property");
        makeProps.put("consumer.isolation.level", "read_committed");
        Assertions.assertEquals(Optional.empty(), validateProperty("exactly.once.support", makeProps));
        makeProps.put("offset.lag.max", "bad");
        Optional<ConfigValue> validateProperty2 = validateProperty("offset.lag.max", makeProps);
        Assertions.assertTrue(validateProperty2.isPresent());
        List errorMessages2 = validateProperty2.get().errorMessages();
        Assertions.assertEquals(1, errorMessages2.size());
        Assertions.assertTrue(((String) errorMessages2.get(0)).contains("offset.lag.max"), "Error message \"" + ((String) errorMessages2.get(0)) + "\" should have mentioned the 'offset.lag.max' property");
        Assertions.assertEquals(Optional.empty(), validateProperty("exactly.once.support", makeProps));
        makeProps.remove("consumer.isolation.level");
        Optional<ConfigValue> validateProperty3 = validateProperty("exactly.once.support", makeProps);
        Assertions.assertTrue(validateProperty3.isPresent());
        List errorMessages3 = validateProperty3.get().errorMessages();
        Assertions.assertEquals(1, errorMessages3.size());
        Assertions.assertTrue(((String) errorMessages3.get(0)).contains("isolation.level"), "Error message \"" + ((String) errorMessages3.get(0)) + "\" should have mentioned the 'isolation.level' consumer property");
    }

    private Optional<ConfigValue> validateProperty(String str, Map<String, String> map) {
        List list = (List) new MirrorSourceConnector().validate(map).configValues().stream().filter(configValue -> {
            return str.equals(configValue.name());
        }).collect(Collectors.toList());
        Assertions.assertTrue(list.size() <= 1, "Connector produced multiple config values for '" + str + "' property");
        if (list.isEmpty()) {
            return Optional.empty();
        }
        ConfigValue configValue2 = (ConfigValue) list.get(0);
        Assertions.assertNotNull(configValue2, "Connector should not have record null config value for '" + str + "' property");
        return Optional.of(configValue2);
    }

    @Test
    public void testAlterOffsetsIncorrectPartitionKey() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Assertions.assertThrows(ConnectException.class, () -> {
            mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(Collections.singletonMap("unused_partition_key", "unused_partition_value"), MirrorUtils.wrapOffset(10L)));
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(null, MirrorUtils.wrapOffset(10L)));
        });
    }

    @Test
    public void testAlterOffsetsMissingPartitionKey() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Function function = map -> {
            return Boolean.valueOf(mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(map, MirrorUtils.wrapOffset(64L))));
        };
        Map<String, Object> sourcePartition = sourcePartition("t", 3, "us-east-2");
        Assertions.assertTrue(((Boolean) function.apply(sourcePartition)).booleanValue());
        for (String str : Arrays.asList("cluster", "topic", "partition")) {
            HashMap hashMap = new HashMap(sourcePartition);
            hashMap.remove(str);
            Assertions.assertThrows(ConnectException.class, () -> {
            });
        }
    }

    @Test
    public void testAlterOffsetsInvalidPartitionPartition() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Map<String, Object> sourcePartition = sourcePartition("t", 3, "us-west-2");
        sourcePartition.put("partition", "a string");
        Assertions.assertThrows(ConnectException.class, () -> {
            mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(sourcePartition, MirrorUtils.wrapOffset(49L)));
        });
    }

    @Test
    public void testAlterOffsetsMultiplePartitions() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Map<String, Object> sourcePartition = sourcePartition("t1", 0, "primary");
        Map<String, Object> sourcePartition2 = sourcePartition("t1", 1, "primary");
        HashMap hashMap = new HashMap();
        hashMap.put(sourcePartition, MirrorUtils.wrapOffset(50L));
        hashMap.put(sourcePartition2, MirrorUtils.wrapOffset(100L));
        Assertions.assertTrue(mirrorSourceConnector.alterOffsets((Map) null, hashMap));
    }

    @Test
    public void testAlterOffsetsIncorrectOffsetKey() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Map singletonMap = Collections.singletonMap(sourcePartition("t1", 2, "backup"), Collections.singletonMap("unused_offset_key", 0));
        Assertions.assertThrows(ConnectException.class, () -> {
            mirrorSourceConnector.alterOffsets((Map) null, singletonMap);
        });
    }

    @Test
    public void testAlterOffsetsOffsetValues() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Function function = obj -> {
            return Boolean.valueOf(mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(sourcePartition("t", 5, "backup"), Collections.singletonMap("offset", obj))));
        };
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertThrows(ConnectException.class, () -> {
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(0)).booleanValue();
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(10)).booleanValue();
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(2147483648L)).booleanValue();
        });
    }

    @Test
    public void testSuccessfulAlterOffsets() {
        MirrorSourceConnector mirrorSourceConnector = new MirrorSourceConnector();
        Assertions.assertTrue(mirrorSourceConnector.alterOffsets((Map) null, Collections.singletonMap(sourcePartition("t2", 0, "backup"), MirrorUtils.wrapOffset(5L))));
        Assertions.assertTrue(mirrorSourceConnector.alterOffsets((Map) null, Collections.emptyMap()));
    }

    @Test
    public void testAlterOffsetsTombstones() {
        MirrorCheckpointConnector mirrorCheckpointConnector = new MirrorCheckpointConnector();
        Function function = map -> {
            return Boolean.valueOf(mirrorCheckpointConnector.alterOffsets((Map) null, Collections.singletonMap(map, null)));
        };
        Map<String, Object> sourcePartition = sourcePartition("kips", 875, "apache.kafka");
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(sourcePartition)).booleanValue();
        });
        sourcePartition.put("partition", "a string");
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(sourcePartition)).booleanValue();
        });
        sourcePartition.remove("partition");
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(sourcePartition)).booleanValue();
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(null)).booleanValue();
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(Collections.emptyMap())).booleanValue();
        });
        Assertions.assertTrue(() -> {
            return ((Boolean) function.apply(Collections.singletonMap("unused_partition_key", "unused_partition_value"))).booleanValue();
        });
    }

    private static Map<String, Object> sourcePartition(String str, int i, String str2) {
        return MirrorUtils.wrapPartition(new TopicPartition(str, i), str2);
    }
}
