package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorCheckpointTask.class */
public class MirrorCheckpointTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);
    private AdminClient sourceAdminClient;
    private String sourceClusterAlias;
    private String targetClusterAlias;
    private String checkpointsTopic;
    private Duration interval;
    private Duration pollTimeout;
    private TopicFilter topicFilter;
    private Set<String> consumerGroups;
    private ReplicationPolicy replicationPolicy;
    private OffsetSyncStore offsetSyncStore;
    private boolean stopping;
    private MirrorMetrics metrics;

    public MirrorCheckpointTask() {
    }

    MirrorCheckpointTask(String str, String str2, ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore) {
        this.sourceClusterAlias = str;
        this.targetClusterAlias = str2;
        this.replicationPolicy = replicationPolicy;
        this.offsetSyncStore = offsetSyncStore;
    }

    public void start(Map<String, String> map) {
        MirrorTaskConfig mirrorTaskConfig = new MirrorTaskConfig(map);
        this.stopping = false;
        this.sourceClusterAlias = mirrorTaskConfig.sourceClusterAlias();
        this.targetClusterAlias = mirrorTaskConfig.targetClusterAlias();
        this.consumerGroups = mirrorTaskConfig.taskConsumerGroups();
        this.checkpointsTopic = mirrorTaskConfig.checkpointsTopic();
        this.topicFilter = mirrorTaskConfig.topicFilter();
        this.replicationPolicy = mirrorTaskConfig.replicationPolicy();
        this.interval = mirrorTaskConfig.emitCheckpointsInterval();
        this.pollTimeout = mirrorTaskConfig.consumerPollTimeout();
        this.offsetSyncStore = new OffsetSyncStore(mirrorTaskConfig);
        this.sourceAdminClient = AdminClient.create(mirrorTaskConfig.sourceAdminConfig());
        this.metrics = mirrorTaskConfig.metrics();
    }

    public void commit() throws InterruptedException {
    }

    public void stop() {
        long currentTimeMillis = System.currentTimeMillis();
        this.stopping = true;
        Utils.closeQuietly(this.offsetSyncStore, "offset sync store");
        Utils.closeQuietly(this.sourceAdminClient, "source admin client");
        Utils.closeQuietly(this.metrics, "metrics");
        log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public String version() {
        return "1";
    }

    public List<SourceRecord> poll() throws InterruptedException {
        try {
            long currentTimeMillis = System.currentTimeMillis() + this.interval.toMillis();
            while (!this.stopping && System.currentTimeMillis() < currentTimeMillis) {
                this.offsetSyncStore.update(this.pollTimeout);
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.consumerGroups.iterator();
            while (it.hasNext()) {
                arrayList.addAll(checkpointsForGroup(it.next()));
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            return arrayList;
        } catch (Throwable th) {
            log.warn("Failure polling consumer state for checkpoints.", th);
            return null;
        }
    }

    private List<SourceRecord> checkpointsForGroup(String str) throws InterruptedException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            return (List) listConsumerGroupOffsets(str).entrySet().stream().filter(entry -> {
                return shouldCheckpointTopic(((TopicPartition) entry.getKey()).topic());
            }).map(entry2 -> {
                return checkpoint(str, (TopicPartition) entry2.getKey(), (OffsetAndMetadata) entry2.getValue());
            }).filter(checkpoint -> {
                return checkpoint.downstreamOffset() > 0;
            }).map(checkpoint2 -> {
                return checkpointRecord(checkpoint2, currentTimeMillis);
            }).collect(Collectors.toList());
        } catch (ExecutionException e) {
            log.error("Error querying offsets for consumer group {} on cluster {}.", new Object[]{str, this.sourceClusterAlias, e});
            return Collections.emptyList();
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String str) throws InterruptedException, ExecutionException {
        return this.stopping ? Collections.emptyMap() : (Map) this.sourceAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
    }

    Checkpoint checkpoint(String str, TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        return new Checkpoint(str, renameTopicPartition(topicPartition), offset, this.offsetSyncStore.translateDownstream(topicPartition, offset), offsetAndMetadata.metadata());
    }

    SourceRecord checkpointRecord(Checkpoint checkpoint, long j) {
        return new SourceRecord(checkpoint.connectPartition(), MirrorUtils.wrapOffset(0L), this.checkpointsTopic, 0, Schema.BYTES_SCHEMA, checkpoint.recordKey(), Schema.BYTES_SCHEMA, checkpoint.recordValue(), Long.valueOf(j));
    }

    TopicPartition renameTopicPartition(TopicPartition topicPartition) {
        return this.targetClusterAlias.equals(this.replicationPolicy.topicSource(topicPartition.topic())) ? new TopicPartition(this.replicationPolicy.originalTopic(topicPartition.topic()), topicPartition.partition()) : new TopicPartition(this.replicationPolicy.formatRemoteTopic(this.sourceClusterAlias, topicPartition.topic()), topicPartition.partition());
    }

    boolean shouldCheckpointTopic(String str) {
        return this.topicFilter.shouldReplicateTopic(str);
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        this.metrics.checkpointLatency(MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()), Checkpoint.unwrapGroup(sourceRecord.sourcePartition()), System.currentTimeMillis() - sourceRecord.timestamp().longValue());
    }
}
