package org.apache.kafka.connect.mirror;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/CheckpointStore.class */
public class CheckpointStore implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CheckpointStore.class);
    private final MirrorCheckpointTaskConfig config;
    private final Set<String> consumerGroups;
    private TopicAdmin cpAdmin;
    private KafkaBasedLog<byte[], byte[]> backingStore;
    Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
    private volatile boolean loadSuccess;
    private volatile boolean isInitialized;

    public CheckpointStore(MirrorCheckpointTaskConfig mirrorCheckpointTaskConfig, Set<String> set) {
        this.cpAdmin = null;
        this.backingStore = null;
        this.loadSuccess = false;
        this.isInitialized = false;
        this.config = mirrorCheckpointTaskConfig;
        this.consumerGroups = new HashSet(set);
    }

    CheckpointStore(Map<String, Map<TopicPartition, Checkpoint>> map) {
        this.cpAdmin = null;
        this.backingStore = null;
        this.loadSuccess = false;
        this.isInitialized = false;
        this.config = null;
        this.consumerGroups = null;
        this.checkpointsPerConsumerGroup = map;
        this.isInitialized = true;
        this.loadSuccess = true;
    }

    public boolean start() {
        this.checkpointsPerConsumerGroup = readCheckpoints();
        this.isInitialized = true;
        if (log.isTraceEnabled()) {
            log.trace("CheckpointStore started, load success={}, map={}", Boolean.valueOf(this.loadSuccess), this.checkpointsPerConsumerGroup);
        } else {
            log.debug("CheckpointStore started, load success={}, map.size={}", Boolean.valueOf(this.loadSuccess), Integer.valueOf(this.checkpointsPerConsumerGroup.size()));
        }
        return this.loadSuccess;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    public void update(String str, Map<TopicPartition, Checkpoint> map) {
        this.checkpointsPerConsumerGroup.computeIfAbsent(str, str2 -> {
            return new HashMap();
        }).putAll(map);
    }

    public Map<TopicPartition, Checkpoint> get(String str) {
        Map<TopicPartition, Checkpoint> map = this.checkpointsPerConsumerGroup.get(str);
        if (map == null) {
            return null;
        }
        return Collections.unmodifiableMap(map);
    }

    public Map<String, Map<TopicPartition, OffsetAndMetadata>> computeConvertedUpstreamOffset() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<TopicPartition, Checkpoint>> entry : this.checkpointsPerConsumerGroup.entrySet()) {
            String key = entry.getKey();
            HashMap hashMap2 = new HashMap();
            for (Checkpoint checkpoint : entry.getValue().values()) {
                hashMap2.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
            }
            hashMap.put(key, hashMap2);
        }
        return hashMap;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        releaseResources();
    }

    private void releaseResources() {
        AutoCloseable autoCloseable;
        if (this.backingStore != null) {
            KafkaBasedLog<byte[], byte[]> kafkaBasedLog = this.backingStore;
            kafkaBasedLog.getClass();
            autoCloseable = kafkaBasedLog::stop;
        } else {
            autoCloseable = null;
        }
        Utils.closeQuietly(autoCloseable, "backing store for previous Checkpoints");
        Utils.closeQuietly(this.cpAdmin, "admin client for previous Checkpoints");
        this.cpAdmin = null;
        this.backingStore = null;
    }

    private Map<String, Map<TopicPartition, Checkpoint>> readCheckpoints() {
        HashMap hashMap = new HashMap();
        Callback<ConsumerRecord<byte[], byte[]>> callback = (th, consumerRecord) -> {
            if (th != null) {
                hashMap.clear();
                if (!(th instanceof RuntimeException)) {
                    throw new RuntimeException(th);
                }
                throw ((RuntimeException) th);
            }
            try {
                Checkpoint deserializeRecord = Checkpoint.deserializeRecord(consumerRecord);
                if (this.consumerGroups.contains(deserializeRecord.consumerGroupId())) {
                    ((Map) hashMap.computeIfAbsent(deserializeRecord.consumerGroupId(), str -> {
                        return new HashMap();
                    })).put(deserializeRecord.topicPartition(), deserializeRecord);
                }
            } catch (SchemaException e) {
                log.warn("Ignored invalid checkpoint record at offset {}", Long.valueOf(consumerRecord.offset()), e);
            }
        };
        try {
            long currentTimeMillis = System.currentTimeMillis();
            readCheckpointsImpl(this.config, callback);
            log.debug("starting+stopping KafkaBasedLog took {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            this.loadSuccess = true;
        } catch (Exception e) {
            this.loadSuccess = false;
            if (e instanceof AuthorizationException) {
                log.warn("Not authorized to access checkpoints topic {} - this may degrade offset translation as only checkpoints for offsets which were mirrored after the task started will be emitted", this.config.checkpointsTopic(), e);
            } else {
                log.info("Exception encountered loading checkpoints topic {} - this may degrade offset translation as only checkpoints for offsets which were mirrored after the task started will be emitted", this.config.checkpointsTopic(), e);
            }
        }
        return hashMap;
    }

    void readCheckpointsImpl(MirrorCheckpointTaskConfig mirrorCheckpointTaskConfig, Callback<ConsumerRecord<byte[], byte[]>> callback) {
        try {
            this.cpAdmin = new TopicAdmin(mirrorCheckpointTaskConfig.targetAdminConfig("checkpoint-target-admin"), mirrorCheckpointTaskConfig.forwardingAdmin(mirrorCheckpointTaskConfig.targetAdminConfig("checkpoint-target-admin")));
            this.backingStore = KafkaBasedLog.withExistingClients(mirrorCheckpointTaskConfig.checkpointsTopic(), MirrorUtils.newConsumer(mirrorCheckpointTaskConfig.targetConsumerConfig(MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE)), (Producer) null, this.cpAdmin, callback, Time.SYSTEM, topicAdmin -> {
            }, topicPartition -> {
                return topicPartition.partition() == 0;
            });
            this.backingStore.start(true);
            this.backingStore.stop();
        } finally {
            releaseResources();
        }
    }
}
