package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorSourceConnector.class */
public class MirrorSourceConnector extends SourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MirrorSourceConnector.class);
    private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC, (String) null, PatternType.ANY);
    private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
    private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString();
    private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
    private final AtomicBoolean noAclAuthorizer;
    private Scheduler scheduler;
    private MirrorSourceConfig config;
    private SourceAndTarget sourceAndTarget;
    private String connectorName;
    private TopicFilter topicFilter;
    private ConfigPropertyFilter configPropertyFilter;
    private List<TopicPartition> knownSourceTopicPartitions;
    private List<TopicPartition> knownTargetTopicPartitions;
    private ReplicationPolicy replicationPolicy;
    private int replicationFactor;
    private Admin sourceAdminClient;
    private Admin targetAdminClient;
    private boolean heartbeatsReplicationEnabled;

    public MirrorSourceConnector() {
        this.noAclAuthorizer = new AtomicBoolean(false);
        this.knownSourceTopicPartitions = Collections.emptyList();
        this.knownTargetTopicPartitions = Collections.emptyList();
    }

    MirrorSourceConnector(List<TopicPartition> list, MirrorSourceConfig mirrorSourceConfig) {
        this.noAclAuthorizer = new AtomicBoolean(false);
        this.knownSourceTopicPartitions = Collections.emptyList();
        this.knownTargetTopicPartitions = Collections.emptyList();
        this.knownSourceTopicPartitions = list;
        this.config = mirrorSourceConfig;
    }

    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, TopicFilter topicFilter, ConfigPropertyFilter configPropertyFilter) {
        this(sourceAndTarget, replicationPolicy, topicFilter, configPropertyFilter, true);
    }

    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, TopicFilter topicFilter, ConfigPropertyFilter configPropertyFilter, boolean z) {
        this.noAclAuthorizer = new AtomicBoolean(false);
        this.knownSourceTopicPartitions = Collections.emptyList();
        this.knownTargetTopicPartitions = Collections.emptyList();
        this.sourceAndTarget = sourceAndTarget;
        this.replicationPolicy = replicationPolicy;
        this.topicFilter = topicFilter;
        this.configPropertyFilter = configPropertyFilter;
        this.heartbeatsReplicationEnabled = z;
    }

    MirrorSourceConnector(Admin admin, Admin admin2, MirrorSourceConfig mirrorSourceConfig) {
        this.noAclAuthorizer = new AtomicBoolean(false);
        this.knownSourceTopicPartitions = Collections.emptyList();
        this.knownTargetTopicPartitions = Collections.emptyList();
        this.sourceAdminClient = admin;
        this.targetAdminClient = admin2;
        this.config = mirrorSourceConfig;
    }

    public void start(Map<String, String> map) {
        long currentTimeMillis = System.currentTimeMillis();
        this.config = new MirrorSourceConfig(map);
        if (this.config.enabled()) {
            this.connectorName = this.config.connectorName();
            this.sourceAndTarget = new SourceAndTarget(this.config.sourceClusterAlias(), this.config.targetClusterAlias());
            this.topicFilter = this.config.topicFilter();
            this.configPropertyFilter = this.config.configPropertyFilter();
            this.replicationPolicy = this.config.replicationPolicy();
            this.replicationFactor = this.config.replicationFactor();
            this.sourceAdminClient = this.config.forwardingAdmin(this.config.sourceAdminConfig("replication-source-admin"));
            this.targetAdminClient = this.config.forwardingAdmin(this.config.targetAdminConfig("replication-target-admin"));
            this.heartbeatsReplicationEnabled = this.config.heartbeatsReplicationEnabled();
            this.scheduler = new Scheduler(getClass(), this.config.entityLabel(), this.config.adminTimeout());
            this.scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
            this.scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
            this.scheduler.execute(this::computeAndCreateTopicPartitions, "creating downstream topic-partitions");
            this.scheduler.execute(this::refreshKnownTargetTopics, "refreshing known target topics");
            this.scheduler.scheduleRepeating(this::syncTopicAcls, this.config.syncTopicAclsInterval(), "syncing topic ACLs");
            this.scheduler.scheduleRepeating(this::syncTopicConfigs, this.config.syncTopicConfigsInterval(), "syncing topic configs");
            this.scheduler.scheduleRepeatingDelayed(this::refreshTopicPartitions, this.config.refreshTopicsInterval(), "refreshing topics");
            log.info("Started {} with {} topic-partitions.", this.connectorName, Integer.valueOf(this.knownSourceTopicPartitions.size()));
            log.info("Starting {} took {} ms.", this.connectorName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
    }

    public void stop() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.config.enabled()) {
            Utils.closeQuietly(this.scheduler, "scheduler");
            Utils.closeQuietly(this.topicFilter, "topic filter");
            Utils.closeQuietly(this.configPropertyFilter, "config property filter");
            Utils.closeQuietly(this.sourceAdminClient, "source admin client");
            Utils.closeQuietly(this.targetAdminClient, "target admin client");
            log.info("Stopping {} took {} ms.", this.connectorName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
    }

    public Class<? extends Task> taskClass() {
        return MirrorSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        if (!this.config.enabled() || this.knownSourceTopicPartitions.isEmpty()) {
            return Collections.emptyList();
        }
        int min = Math.min(i, this.knownSourceTopicPartitions.size());
        ArrayList arrayList = new ArrayList(min);
        for (int i2 = 0; i2 < min; i2++) {
            arrayList.add(new ArrayList());
        }
        int i3 = 0;
        Iterator<TopicPartition> it = this.knownSourceTopicPartitions.iterator();
        while (it.hasNext()) {
            ((List) arrayList.get(i3 % min)).add(it.next());
            i3++;
        }
        return (List) IntStream.range(0, min).mapToObj(i4 -> {
            return this.config.taskConfigForTopicPartitions((List) arrayList.get(i4), i4);
        }).collect(Collectors.toList());
    }

    public ConfigDef config() {
        return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
    }

    public Config validate(Map<String, String> map) {
        List<ConfigValue> configValues = super.validate(map).configValues();
        validateExactlyOnceConfigs(map, configValues);
        validateEmitOffsetSyncConfigs(map, configValues);
        return new Config(configValues);
    }

    private static void validateEmitOffsetSyncConfigs(Map<String, String> map, List<ConfigValue> list) {
        boolean anyMatch = map.keySet().stream().anyMatch(str -> {
            return str.startsWith(MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || str.startsWith(MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX);
        });
        if ("false".equals(map.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && anyMatch) {
            list.stream().filter(configValue -> {
                return MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(configValue.name());
            }).findAny().orElseGet(() -> {
                ConfigValue configValue2 = new ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
                list.add(configValue2);
                return configValue2;
            }).addErrorMessage("MirrorSourceConnector can't setup offset-syncs feature while emit.offset-syncs.enabled set to false");
        }
    }

    private void validateExactlyOnceConfigs(Map<String, String> map, List<ConfigValue> list) {
        if (!"required".equals(map.get(EXACTLY_ONCE_SUPPORT_CONFIG)) || consumerUsesReadCommitted(map)) {
            return;
        }
        list.stream().filter(configValue -> {
            return EXACTLY_ONCE_SUPPORT_CONFIG.equals(configValue.name());
        }).findAny().orElseGet(() -> {
            ConfigValue configValue2 = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
            list.add(configValue2);
            return configValue2;
        }).addErrorMessage("MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with isolation.level set to '" + READ_COMMITTED + "'; otherwise, records from aborted and uncommitted transactions will be replicated from the source cluster to the target cluster.");
    }

    public String version() {
        return AppInfoParser.getVersion();
    }

    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> map) {
        return consumerUsesReadCommitted(map) ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
    }

    public boolean alterOffsets(Map<String, String> map, Map<Map<String, ?>, Map<String, ?>> map2) {
        for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : map2.entrySet()) {
            Map<String, ?> value = entry.getValue();
            if (value != null) {
                Map<String, ?> key = entry.getKey();
                if (key == null) {
                    throw new ConnectException("Source partitions may not be null");
                }
                MirrorUtils.validateSourcePartitionString(key, MirrorUtils.SOURCE_CLUSTER_KEY);
                MirrorUtils.validateSourcePartitionString(key, "topic");
                MirrorUtils.validateSourcePartitionPartition(key);
                MirrorUtils.validateSourceOffset(key, value, false);
            }
        }
        return true;
    }

    private boolean consumerUsesReadCommitted(Map<String, String> map) {
        return Objects.equals(READ_COMMITTED, MirrorSourceConfig.sourceConsumerConfig(map).get("isolation.level"));
    }

    List<TopicPartition> findSourceTopicPartitions() throws InterruptedException, ExecutionException {
        return (List) describeTopics(this.sourceAdminClient, (Set) listTopics(this.sourceAdminClient).stream().filter(this::shouldReplicateTopic).collect(Collectors.toSet())).stream().flatMap(MirrorSourceConnector::expandTopicDescription).collect(Collectors.toList());
    }

    List<TopicPartition> findTargetTopicPartitions() throws InterruptedException, ExecutionException {
        return (List) describeTopics(this.targetAdminClient, (Set) listTopics(this.targetAdminClient).stream().filter(str -> {
            return this.sourceAndTarget.source().equals(this.replicationPolicy.topicSource(str));
        }).filter(str2 -> {
            return !str2.equals(this.config.checkpointsTopic());
        }).collect(Collectors.toSet())).stream().flatMap(MirrorSourceConnector::expandTopicDescription).collect(Collectors.toList());
    }

    void refreshTopicPartitions() throws InterruptedException, ExecutionException {
        List<TopicPartition> findSourceTopicPartitions = findSourceTopicPartitions();
        List<TopicPartition> findTargetTopicPartitions = findTargetTopicPartitions();
        HashSet hashSet = new HashSet(findSourceTopicPartitions);
        HashSet hashSet2 = new HashSet(this.knownSourceTopicPartitions);
        Set set = (Set) findTargetTopicPartitions.stream().map(topicPartition -> {
            return new TopicPartition(this.replicationPolicy.upstreamTopic(topicPartition.topic()), topicPartition.partition());
        }).collect(Collectors.toSet());
        HashSet hashSet3 = new HashSet(findSourceTopicPartitions);
        hashSet3.removeAll(set);
        this.knownTargetTopicPartitions = findTargetTopicPartitions;
        if (hashSet2.equals(hashSet) && hashSet3.isEmpty()) {
            return;
        }
        HashSet hashSet4 = new HashSet(findSourceTopicPartitions);
        hashSet4.removeAll(hashSet2);
        hashSet2.removeAll(hashSet);
        log.info("Found {} new topic-partitions on {}. Found {} deleted topic-partitions on {}. Found {} topic-partitions missing on {}.", new Object[]{Integer.valueOf(hashSet4.size()), this.sourceAndTarget.source(), Integer.valueOf(hashSet2.size()), this.sourceAndTarget.source(), Integer.valueOf(hashSet3.size()), this.sourceAndTarget.target()});
        log.trace("Found new topic-partitions on {}: {}", this.sourceAndTarget.source(), hashSet4);
        log.trace("Found deleted topic-partitions on {}: {}", this.sourceAndTarget.source(), hashSet2);
        log.trace("Found missing topic-partitions on {}: {}", this.sourceAndTarget.target(), hashSet3);
        this.knownSourceTopicPartitions = findSourceTopicPartitions;
        computeAndCreateTopicPartitions();
        this.context.requestTaskReconfiguration();
    }

    private void loadTopicPartitions() throws InterruptedException, ExecutionException {
        this.knownSourceTopicPartitions = findSourceTopicPartitions();
        this.knownTargetTopicPartitions = findTargetTopicPartitions();
    }

    private void refreshKnownTargetTopics() throws InterruptedException, ExecutionException {
        this.knownTargetTopicPartitions = findTargetTopicPartitions();
    }

    private Set<String> topicsBeingReplicated() {
        Set<String> topics = toTopics(this.knownTargetTopicPartitions);
        return (Set) this.knownSourceTopicPartitions.stream().map((v0) -> {
            return v0.topic();
        }).distinct().filter(str -> {
            return topics.contains(formatRemoteTopic(str));
        }).collect(Collectors.toSet());
    }

    private Set<String> toTopics(Collection<TopicPartition> collection) {
        return (Set) collection.stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    void syncTopicAcls() throws InterruptedException, ExecutionException {
        Optional<Collection<AclBinding>> listTopicAclBindings = listTopicAclBindings();
        if (listTopicAclBindings.isPresent()) {
            updateTopicAcls((List) listTopicAclBindings.get().stream().filter(aclBinding -> {
                return aclBinding.pattern().resourceType() == ResourceType.TOPIC;
            }).filter(aclBinding2 -> {
                return aclBinding2.pattern().patternType() == PatternType.LITERAL;
            }).filter(this::shouldReplicateAcl).filter(aclBinding3 -> {
                return shouldReplicateTopic(aclBinding3.pattern().name());
            }).map(this::targetAclBinding).collect(Collectors.toList()));
        }
    }

    void syncTopicConfigs() throws InterruptedException, ExecutionException {
        incrementalAlterConfigs((Map) describeTopicConfigs(topicsBeingReplicated()).entrySet().stream().collect(Collectors.toMap(entry -> {
            return formatRemoteTopic((String) entry.getKey());
        }, entry2 -> {
            return targetConfig((org.apache.kafka.clients.admin.Config) entry2.getValue(), true);
        })));
    }

    private void createOffsetSyncsTopic() {
        if (this.config.emitOffsetSyncsEnabled()) {
            ForwardingAdmin forwardingAdmin = this.config.forwardingAdmin(this.config.offsetSyncsTopicAdminConfig());
            try {
                MirrorUtils.createSinglePartitionCompactedTopic(this.config.offsetSyncsTopic(), this.config.offsetSyncsTopicReplicationFactor(), forwardingAdmin);
                if (forwardingAdmin != null) {
                    forwardingAdmin.close();
                }
            } catch (Throwable th) {
                if (forwardingAdmin != null) {
                    try {
                        forwardingAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
        Map<String, Long> map = (Map) ((Map) this.knownSourceTopicPartitions.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }, Collectors.counting()))).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Map map2 = (Map) ((Map) this.knownTargetTopicPartitions.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }, Collectors.counting()))).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Set<String> keySet = map.keySet();
        Set keySet2 = map2.keySet();
        Map map3 = (Map) keySet.stream().collect(Collectors.toMap(Function.identity(), this::formatRemoteTopic));
        Map map4 = (Map) keySet.stream().collect(Collectors.partitioningBy(str -> {
            return keySet2.contains(map3.get(str));
        }, Collectors.toSet()));
        Set set = (Set) map4.get(true);
        Set<String> set2 = (Set) map4.get(false);
        if (!set2.isEmpty()) {
            createNewTopics(set2, map);
        }
        Stream filter = set.stream().filter(str2 -> {
            return ((Long) map.get(str2)).longValue() > ((Long) map2.get((String) map3.get(str2))).longValue();
        });
        Function identity = Function.identity();
        Objects.requireNonNull(map);
        Map map5 = (Map) filter.collect(Collectors.toMap(identity, (v1) -> {
            return r2.get(v1);
        }));
        if (map5.isEmpty()) {
            return;
        }
        createNewPartitions((Map) map5.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) map3.get(entry.getKey());
        }, entry2 -> {
            return NewPartitions.increaseTo(((Long) entry2.getValue()).intValue());
        })));
    }

    void createNewTopics(Set<String> set, Map<String, Long> map) throws ExecutionException, InterruptedException {
        Map<String, org.apache.kafka.clients.admin.Config> describeTopicConfigs = describeTopicConfigs(set);
        createNewTopics((Map) set.stream().map(str -> {
            String formatRemoteTopic = formatRemoteTopic(str);
            int intValue = ((Long) map.get(str)).intValue();
            return new NewTopic(formatRemoteTopic, intValue, (short) this.replicationFactor).configs(configToMap(targetConfig((org.apache.kafka.clients.admin.Config) describeTopicConfigs.get(str), false)));
        }).collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity())));
    }

    void createNewTopics(Map<String, NewTopic> map) throws ExecutionException, InterruptedException {
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createTopics(map.values(), new CreateTopicsOptions()).values().forEach((str, kafkaFuture) -> {
                kafkaFuture.whenComplete((r8, th) -> {
                    if (th != null) {
                        log.warn("Could not create topic {}.", str, th);
                    } else {
                        log.info("Created remote topic {} with {} partitions.", str, Integer.valueOf(((NewTopic) map.get(str)).numPartitions()));
                    }
                });
            });
            return null;
        }, () -> {
            return String.format("create topics %s on %s cluster", map, this.config.targetClusterAlias());
        });
    }

    void createNewPartitions(Map<String, NewPartitions> map) throws ExecutionException, InterruptedException {
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createPartitions(map).values().forEach((str, kafkaFuture) -> {
                kafkaFuture.whenComplete((r8, th) -> {
                    if (th instanceof InvalidPartitionsException) {
                        return;
                    }
                    if (th != null) {
                        log.warn("Could not create topic-partitions for {}.", str, th);
                    } else {
                        log.info("Increased size of {} to {} partitions.", str, Integer.valueOf(((NewPartitions) map.get(str)).totalCount()));
                    }
                });
            });
            return null;
        }, () -> {
            return String.format("create partitions %s on %s cluster", map, this.config.targetClusterAlias());
        });
    }

    private Set<String> listTopics(Admin admin) throws InterruptedException, ExecutionException {
        return (Set) MirrorUtils.adminCall(() -> {
            return (Set) admin.listTopics().names().get();
        }, () -> {
            return "list topics on " + actualClusterAlias(admin) + " cluster";
        });
    }

    private Optional<Collection<AclBinding>> listTopicAclBindings() throws InterruptedException, ExecutionException {
        return (Optional) MirrorUtils.adminCall(() -> {
            try {
                return Optional.of((Collection) this.sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get());
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof SecurityDisabledException)) {
                    throw e;
                }
                if (this.noAclAuthorizer.compareAndSet(false, true)) {
                    log.info("No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. Consider disabling topic ACL syncing by setting sync.topic.acls.enabled to 'false'.");
                } else {
                    log.debug("Source-side ACL authorizer still not found; skipping topic ACL sync");
                }
                return Optional.empty();
            }
        }, () -> {
            return "describe ACLs on " + this.config.sourceClusterAlias() + " cluster";
        });
    }

    private Collection<TopicDescription> describeTopics(Admin admin, Collection<String> collection) throws InterruptedException, ExecutionException {
        return (Collection) MirrorUtils.adminCall(() -> {
            return ((Map) admin.describeTopics(collection).allTopicNames().get()).values();
        }, () -> {
            return String.format("describe topics %s on %s cluster", collection, actualClusterAlias(admin));
        });
    }

    static Map<String, String> configToMap(org.apache.kafka.clients.admin.Config config) {
        return (Map) config.entries().stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, (v0) -> {
            return v0.value();
        }));
    }

    void incrementalAlterConfigs(Map<String, org.apache.kafka.clients.admin.Config> map) throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, org.apache.kafka.clients.admin.Config> entry : map.entrySet()) {
            ArrayList arrayList = new ArrayList();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, entry.getKey());
            for (ConfigEntry configEntry : entry.getValue().entries()) {
                if (!configEntry.isDefault() || shouldReplicateSourceDefault(configEntry.name())) {
                    arrayList.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
                } else {
                    arrayList.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE));
                }
            }
            hashMap.put(configResource, arrayList);
        }
        log.trace("Syncing configs for {} topics.", Integer.valueOf(hashMap.size()));
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.incrementalAlterConfigs(hashMap).values().forEach((configResource2, kafkaFuture) -> {
                kafkaFuture.whenComplete((r9, th) -> {
                    if (!(th instanceof UnsupportedVersionException)) {
                        log.warn("Could not alter configuration of topic {}.", configResource2.name(), th);
                    } else {
                        log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", new Object[]{configResource2.name(), this.sourceAndTarget.target(), th});
                        this.context.raiseError(new ConnectException("the target cluster '" + this.sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", th));
                    }
                });
            });
            return null;
        }, () -> {
            return String.format("incremental alter topic configs %s on %s cluster", map, this.config.targetClusterAlias());
        });
    }

    private void updateTopicAcls(List<AclBinding> list) throws ExecutionException, InterruptedException {
        log.trace("Syncing {} topic ACL bindings.", Integer.valueOf(list.size()));
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createAcls(list).values().forEach((aclBinding, kafkaFuture) -> {
                kafkaFuture.whenComplete((r6, th) -> {
                    if (th != null) {
                        log.warn("Could not sync ACL of topic {}.", aclBinding.pattern().name(), th);
                    }
                });
            });
            return null;
        }, () -> {
            return String.format("create ACLs %s on %s cluster", list, this.config.targetClusterAlias());
        });
    }

    private static Stream<TopicPartition> expandTopicDescription(TopicDescription topicDescription) {
        String name = topicDescription.name();
        return topicDescription.partitions().stream().map(topicPartitionInfo -> {
            return new TopicPartition(name, topicPartitionInfo.partition());
        });
    }

    Map<String, org.apache.kafka.clients.admin.Config> describeTopicConfigs(Set<String> set) throws InterruptedException, ExecutionException {
        Set set2 = (Set) set.stream().map(str -> {
            return new ConfigResource(ConfigResource.Type.TOPIC, str);
        }).collect(Collectors.toSet());
        return (Map) MirrorUtils.adminCall(() -> {
            return (Map) ((Map) this.sourceAdminClient.describeConfigs(set2).all().get()).entrySet().stream().collect(Collectors.toMap(entry -> {
                return ((ConfigResource) entry.getKey()).name();
            }, (v0) -> {
                return v0.getValue();
            }));
        }, () -> {
            return String.format("describe configs for topics %s on %s cluster", set, this.config.sourceClusterAlias());
        });
    }

    org.apache.kafka.clients.admin.Config targetConfig(org.apache.kafka.clients.admin.Config config, boolean z) {
        return new org.apache.kafka.clients.admin.Config((List) config.entries().stream().filter(configEntry -> {
            return z || (configEntry.isDefault() && shouldReplicateSourceDefault(configEntry.name())) || !configEntry.isDefault();
        }).filter(configEntry2 -> {
            return (configEntry2.isReadOnly() || configEntry2.isSensitive()) ? false : true;
        }).filter(configEntry3 -> {
            return configEntry3.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
        }).filter(configEntry4 -> {
            return shouldReplicateTopicConfigurationProperty(configEntry4.name());
        }).collect(Collectors.toList()));
    }

    private static AccessControlEntry downgradeAllowAllACL(AccessControlEntry accessControlEntry) {
        return new AccessControlEntry(accessControlEntry.principal(), accessControlEntry.host(), AclOperation.READ, accessControlEntry.permissionType());
    }

    AclBinding targetAclBinding(AclBinding aclBinding) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, formatRemoteTopic(aclBinding.pattern().name()), PatternType.LITERAL), (aclBinding.entry().permissionType() == AclPermissionType.ALLOW && aclBinding.entry().operation() == AclOperation.ALL) ? downgradeAllowAllACL(aclBinding.entry()) : aclBinding.entry());
    }

    boolean shouldReplicateTopic(String str) {
        return ((!this.topicFilter.shouldReplicateTopic(str) && (!this.heartbeatsReplicationEnabled || !this.replicationPolicy.isHeartbeatsTopic(str))) || this.replicationPolicy.isInternalTopic(str) || isCycle(str)) ? false : true;
    }

    boolean shouldReplicateAcl(AclBinding aclBinding) {
        return (aclBinding.entry().permissionType() == AclPermissionType.ALLOW && aclBinding.entry().operation() == AclOperation.WRITE) ? false : true;
    }

    boolean shouldReplicateTopicConfigurationProperty(String str) {
        return this.configPropertyFilter.shouldReplicateConfigProperty(str);
    }

    boolean shouldReplicateSourceDefault(String str) {
        return this.configPropertyFilter.shouldReplicateSourceDefault(str);
    }

    boolean isCycle(String str) {
        String str2 = this.replicationPolicy.topicSource(str);
        if (str2 == null) {
            return false;
        }
        if (str2.equals(this.sourceAndTarget.target())) {
            return true;
        }
        String upstreamTopic = this.replicationPolicy.upstreamTopic(str);
        if (upstreamTopic == null || upstreamTopic.equals(str)) {
            return false;
        }
        return isCycle(upstreamTopic);
    }

    String formatRemoteTopic(String str) {
        return this.replicationPolicy.formatRemoteTopic(this.sourceAndTarget.source(), str);
    }

    private String actualClusterAlias(Admin admin) {
        return admin.equals(this.sourceAdminClient) ? this.config.sourceClusterAlias() : this.config.targetClusterAlias();
    }
}
