package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorCheckpointConnector.class */
public class MirrorCheckpointConnector extends SourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointConnector.class);
    private Scheduler scheduler;
    private MirrorCheckpointConfig config;
    private TopicFilter topicFilter;
    private GroupFilter groupFilter;
    private Admin sourceAdminClient;
    private Admin targetAdminClient;
    private SourceAndTarget sourceAndTarget;
    private Set<String> knownConsumerGroups;

    public MirrorCheckpointConnector() {
        this.knownConsumerGroups = Collections.emptySet();
    }

    MirrorCheckpointConnector(Set<String> set, MirrorCheckpointConfig mirrorCheckpointConfig) {
        this.knownConsumerGroups = Collections.emptySet();
        this.knownConsumerGroups = set;
        this.config = mirrorCheckpointConfig;
    }

    public void start(Map<String, String> map) {
        this.config = new MirrorCheckpointConfig(map);
        if (this.config.enabled()) {
            String connectorName = this.config.connectorName();
            this.sourceAndTarget = new SourceAndTarget(this.config.sourceClusterAlias(), this.config.targetClusterAlias());
            this.topicFilter = this.config.topicFilter();
            this.groupFilter = this.config.groupFilter();
            this.sourceAdminClient = this.config.forwardingAdmin(this.config.sourceAdminConfig("checkpoint-source-admin"));
            this.targetAdminClient = this.config.forwardingAdmin(this.config.targetAdminConfig("checkpoint-target-admin"));
            this.scheduler = new Scheduler(getClass(), this.config.entityLabel(), this.config.adminTimeout());
            this.scheduler.execute(this::createInternalTopics, "creating internal topics");
            this.scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
            this.scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, this.config.refreshGroupsInterval(), "refreshing consumer groups");
            log.info("Started {} with {} consumer groups.", connectorName, Integer.valueOf(this.knownConsumerGroups.size()));
            log.debug("Started {} with consumer groups: {}", connectorName, this.knownConsumerGroups);
        }
    }

    public void stop() {
        if (this.config.enabled()) {
            Utils.closeQuietly(this.scheduler, "scheduler");
            Utils.closeQuietly(this.topicFilter, "topic filter");
            Utils.closeQuietly(this.groupFilter, "group filter");
            Utils.closeQuietly(this.sourceAdminClient, "source admin client");
            Utils.closeQuietly(this.targetAdminClient, "target admin client");
        }
    }

    public Class<? extends Task> taskClass() {
        return MirrorCheckpointTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        if (!this.config.enabled() || this.knownConsumerGroups.isEmpty() || this.config.emitCheckpointsInterval().isNegative()) {
            return Collections.emptyList();
        }
        int min = Math.min(i, this.knownConsumerGroups.size());
        List groupPartitions = ConnectorUtils.groupPartitions(new ArrayList(this.knownConsumerGroups), min);
        return (List) IntStream.range(0, min).mapToObj(i2 -> {
            return this.config.taskConfigForConsumerGroups((List) groupPartitions.get(i2), i2);
        }).collect(Collectors.toList());
    }

    public ConfigDef config() {
        return MirrorCheckpointConfig.CONNECTOR_CONFIG_DEF;
    }

    public String version() {
        return AppInfoParser.getVersion();
    }

    public boolean alterOffsets(Map<String, String> map, Map<Map<String, ?>, Map<String, ?>> map2) {
        for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : map2.entrySet()) {
            Map<String, ?> value = entry.getValue();
            if (value != null) {
                Map<String, ?> key = entry.getKey();
                if (key == null) {
                    throw new ConnectException("Source partitions may not be null");
                }
                MirrorUtils.validateSourcePartitionString(key, "group");
                MirrorUtils.validateSourcePartitionString(key, "topic");
                MirrorUtils.validateSourcePartitionPartition(key);
                MirrorUtils.validateSourceOffset(key, value, true);
            }
        }
        return true;
    }

    private void refreshConsumerGroups() throws InterruptedException, ExecutionException {
        Set<String> findConsumerGroups = findConsumerGroups();
        HashSet hashSet = new HashSet(findConsumerGroups);
        hashSet.removeAll(this.knownConsumerGroups);
        HashSet hashSet2 = new HashSet(this.knownConsumerGroups);
        hashSet2.removeAll(findConsumerGroups);
        if (hashSet.isEmpty() && hashSet2.isEmpty()) {
            return;
        }
        log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.", new Object[]{Integer.valueOf(findConsumerGroups.size()), this.sourceAndTarget, Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size()), Integer.valueOf(this.knownConsumerGroups.size())});
        log.debug("Found new consumer groups: {}", hashSet);
        this.knownConsumerGroups = findConsumerGroups;
        this.context.requestTaskReconfiguration();
    }

    private void loadInitialConsumerGroups() throws InterruptedException, ExecutionException {
        this.knownConsumerGroups = findConsumerGroups();
    }

    Set<String> findConsumerGroups() throws InterruptedException, ExecutionException {
        List<String> list = (List) listConsumerGroups().stream().map((v0) -> {
            return v0.groupId();
        }).filter(this::shouldReplicateByGroupFilter).collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (String str : list) {
            if (((Set) listConsumerGroupOffsets(str).keySet().stream().map((v0) -> {
                return v0.topic();
            }).filter(this::shouldReplicateByTopicFilter).collect(Collectors.toSet())).isEmpty()) {
                hashSet2.add(str);
            } else {
                hashSet.add(str);
            }
        }
        log.debug("Ignoring the following groups which do not have any offsets for topics that are accepted by the topic filter: {}", hashSet2);
        return hashSet;
    }

    Collection<ConsumerGroupListing> listConsumerGroups() throws InterruptedException, ExecutionException {
        return (Collection) MirrorUtils.adminCall(() -> {
            return (Collection) this.sourceAdminClient.listConsumerGroups().valid().get();
        }, () -> {
            return "list consumer groups on " + this.config.sourceClusterAlias() + " cluster";
        });
    }

    private void createInternalTopics() {
        MirrorUtils.createSinglePartitionCompactedTopic(this.config.checkpointsTopic(), this.config.checkpointsTopicReplicationFactor(), this.targetAdminClient);
    }

    Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String str) throws InterruptedException, ExecutionException {
        return (Map) MirrorUtils.adminCall(() -> {
            return (Map) this.sourceAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
        }, () -> {
            return String.format("list offsets for consumer group %s on %s cluster", str, this.config.sourceClusterAlias());
        });
    }

    boolean shouldReplicateByGroupFilter(String str) {
        return this.groupFilter.shouldReplicateGroup(str);
    }

    boolean shouldReplicateByTopicFilter(String str) {
        return this.topicFilter.shouldReplicateTopic(str);
    }
}
