package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorClient.class */
public class MirrorClient implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MirrorClient.class);
    private final AdminClient adminClient;
    private final ReplicationPolicy replicationPolicy;
    private final Map<String, Object> consumerConfig;

    public MirrorClient(Map<String, Object> map) {
        this(new MirrorClientConfig(map));
    }

    public MirrorClient(MirrorClientConfig mirrorClientConfig) {
        this.adminClient = AdminClient.create(mirrorClientConfig.adminConfig());
        this.consumerConfig = mirrorClientConfig.consumerConfig();
        this.replicationPolicy = mirrorClientConfig.replicationPolicy();
    }

    MirrorClient(AdminClient adminClient, ReplicationPolicy replicationPolicy, Map<String, Object> map) {
        this.adminClient = adminClient;
        this.replicationPolicy = replicationPolicy;
        this.consumerConfig = map;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.adminClient.close();
    }

    public ReplicationPolicy replicationPolicy() {
        return this.replicationPolicy;
    }

    public int replicationHops(String str) throws InterruptedException {
        return heartbeatTopics().stream().map(str2 -> {
            return Integer.valueOf(countHopsForTopic(str2, str));
        }).filter(num -> {
            return num.intValue() != -1;
        }).mapToInt(num2 -> {
            return num2.intValue();
        }).min().orElse(-1);
    }

    public Set<String> heartbeatTopics() throws InterruptedException {
        return (Set) listTopics().stream().filter(this::isHeartbeatTopic).collect(Collectors.toSet());
    }

    public Set<String> checkpointTopics() throws InterruptedException {
        return (Set) listTopics().stream().filter(this::isCheckpointTopic).collect(Collectors.toSet());
    }

    public Set<String> upstreamClusters() throws InterruptedException {
        return (Set) listTopics().stream().filter(this::isHeartbeatTopic).flatMap(str -> {
            return allSources(str).stream();
        }).collect(Collectors.toSet());
    }

    public Set<String> remoteTopics() throws InterruptedException {
        return (Set) listTopics().stream().filter(this::isRemoteTopic).collect(Collectors.toSet());
    }

    public Set<String> remoteTopics(String str) throws InterruptedException {
        return (Set) listTopics().stream().filter(this::isRemoteTopic).filter(str2 -> {
            return str.equals(this.replicationPolicy.topicSource(str2));
        }).collect(Collectors.toSet());
    }

    public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String str, String str2, Duration duration) {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        HashMap hashMap = new HashMap();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Throwable th = null;
        try {
            String checkpointsTopic = this.replicationPolicy.checkpointsTopic(str2);
            List singletonList = Collections.singletonList(new TopicPartition(checkpointsTopic, 0));
            kafkaConsumer.assign(singletonList);
            kafkaConsumer.seekToBeginning(singletonList);
            while (System.currentTimeMillis() < currentTimeMillis && !endOfStream(kafkaConsumer, singletonList)) {
                Iterator it = kafkaConsumer.poll(duration).iterator();
                while (it.hasNext()) {
                    try {
                        Checkpoint deserializeRecord = Checkpoint.deserializeRecord((ConsumerRecord) it.next());
                        if (deserializeRecord.consumerGroupId().equals(str)) {
                            hashMap.put(deserializeRecord.topicPartition(), deserializeRecord.offsetAndMetadata());
                        }
                    } catch (SchemaException e) {
                        log.info("Could not deserialize record. Skipping.", e);
                    }
                }
            }
            log.info("Consumed {} checkpoint records for {} from {}.", new Object[]{Integer.valueOf(hashMap.size()), str, checkpointsTopic});
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            return hashMap;
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    Set<String> listTopics() throws InterruptedException {
        try {
            return (Set) this.adminClient.listTopics().names().get();
        } catch (ExecutionException e) {
            throw new KafkaException(e.getCause());
        }
    }

    int countHopsForTopic(String str, String str2) {
        int i = 0;
        HashSet hashSet = new HashSet();
        while (true) {
            i++;
            String str3 = this.replicationPolicy.topicSource(str);
            if (str3 == null) {
                return -1;
            }
            if (str3.equals(str2)) {
                return i;
            }
            if (hashSet.contains(str3)) {
                return -1;
            }
            hashSet.add(str3);
            str = this.replicationPolicy.upstreamTopic(str);
        }
    }

    boolean isHeartbeatTopic(String str) {
        return this.replicationPolicy.isHeartbeatsTopic(str);
    }

    boolean isCheckpointTopic(String str) {
        return this.replicationPolicy.isCheckpointsTopic(str);
    }

    boolean isRemoteTopic(String str) {
        return (this.replicationPolicy.isInternalTopic(str) || this.replicationPolicy.topicSource(str) == null) ? false : true;
    }

    Set<String> allSources(String str) {
        HashSet hashSet = new HashSet();
        String str2 = this.replicationPolicy.topicSource(str);
        while (true) {
            String str3 = str2;
            if (str3 == null || hashSet.contains(str3)) {
                break;
            }
            hashSet.add(str3);
            str = this.replicationPolicy.upstreamTopic(str);
            str2 = this.replicationPolicy.topicSource(str);
        }
        return hashSet;
    }

    private static boolean endOfStream(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
        Map endOffsets = consumer.endOffsets(collection);
        for (TopicPartition topicPartition : collection) {
            if (consumer.position(topicPartition) < ((Long) endOffsets.get(topicPartition)).longValue()) {
                return false;
            }
        }
        return true;
    }
}
