package kafka.tier.raft;

import io.confluent.kafka.concurrent.EventExecutor;
import io.confluent.kafka.raft.SimpleRaftTracer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import kafka.server.KafkaRaftServer;
import kafka.tier.exceptions.E2EChecksumInvalidException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.VersionInformation;
import kafka.tier.store.objects.metadata.KRaftSnapshotMetadata;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.LoggingFaultHandler;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/tier/raft/KRaftSnapshotManager.class */
public final class KRaftSnapshotManager implements SimpleRaftTracer {
    private final EventExecutor executor;
    private final TierObjectStore objectStore;
    private final KRaftSnapshotMetrics metrics;
    private final Logger logger;
    private final Function<TopicIdPartition, Optional<Path>> topicIdPath;
    private final String clusterId;
    private final int nodeId;
    private final Supplier<Boolean> deleteEnable;
    private final Supplier<Long> retentionMs;
    private final Time time;
    public static final String KEY_PREFIX = "";
    private static final long MAX_REMOTE_RETRY_SECONDS = TimeUnit.MINUTES.toSeconds(1);
    final Context context = new Context();
    private final FaultHandler faultHandler = new LoggingFaultHandler("raftSnapshotManager", () -> {
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/raft/KRaftSnapshotManager$Context.class */
    public static final class Context {
        int remoteListRetries = 0;
        Optional<SortedSet<KRaftSnapshotObject>> remoteObjects = Optional.empty();
        boolean uploadScheduled = false;
        int remotePutRetries = 0;
        final SortedSet<LocalSnapshotObject> localObjects = new TreeSet();
        int remoteDeleteRetries = 0;

        Context() {
        }
    }

    private KRaftSnapshotManager(EventExecutor eventExecutor, TierObjectStore tierObjectStore, KRaftSnapshotMetrics kRaftSnapshotMetrics, LogContext logContext, Function<TopicIdPartition, Optional<Path>> function, String str, int i, Supplier<Boolean> supplier, Supplier<Long> supplier2, Time time) {
        this.executor = eventExecutor;
        this.objectStore = tierObjectStore;
        this.metrics = kRaftSnapshotMetrics;
        this.logger = logContext.logger(getClass());
        this.topicIdPath = function;
        this.clusterId = str;
        this.nodeId = i;
        this.deleteEnable = supplier;
        this.retentionMs = supplier2;
        this.time = time;
        scheduleListRemoteObjects();
    }

    private void scheduleListRemoteObjects() {
        Callable<Void> callable = () -> {
            try {
                Map<KRaftSnapshotObject, List<VersionInformation>> listObjectsByNode = KRaftSnapshotObjectUtils.listObjectsByNode(this.objectStore, false, KEY_PREFIX, KafkaRaftServer.MetadataTopicId(), KafkaRaftServer.MetadataPartition().partition(), this.clusterId, this.nodeId);
                this.context.remoteListRetries = 0;
                this.context.remoteObjects = Optional.of(new TreeSet(listObjectsByNode.keySet()));
                if (this.deleteEnable.get().booleanValue()) {
                    scheduleDeleteRemoteObjects();
                }
                scheduleUploadLocalObjects();
                return null;
            } catch (TierObjectStoreRetriableException e) {
                this.context.remoteListRetries++;
                this.logger.warn("ListRemoteObject for snapshots failed " + this.context.remoteListRetries + " times", e);
                scheduleListRemoteObjects();
                return null;
            }
        };
        if (this.context.remoteListRetries == 0) {
            this.executor.submit(createEvent("ListRemoteObject", callable));
        } else {
            this.executor.schedule(createEvent("ListRemoteObject", callable), Math.min(10, this.context.remoteListRetries), TimeUnit.SECONDS);
        }
    }

    private Optional<LocalSnapshotObject> nextPendingLocal() {
        return this.context.remoteObjects.flatMap(sortedSet -> {
            this.context.localObjects.removeIf(localSnapshotObject -> {
                return sortedSet.contains(localSnapshotObject.snapshotObject());
            });
            return this.context.localObjects.isEmpty() ? Optional.empty() : Optional.of(this.context.localObjects.last());
        });
    }

    private void scheduleUploadLocalObjects() {
        if (this.context.uploadScheduled || !nextPendingLocal().isPresent()) {
            return;
        }
        Callable<Void> callable = () -> {
            this.context.uploadScheduled = false;
            try {
                try {
                    Optional<LocalSnapshotObject> nextPendingLocal = nextPendingLocal();
                    if (!nextPendingLocal.isPresent()) {
                        return null;
                    }
                    LocalSnapshotObject localSnapshotObject = nextPendingLocal.get();
                    Optional<Path> apply = this.topicIdPath.apply(localSnapshotObject.topicIdPartition());
                    if (!apply.isPresent()) {
                        throw new FileNotFoundException("logDir could not be found for object: " + nextPendingLocal);
                    }
                    Path snapshotPath = Snapshots.snapshotPath(apply.get(), localSnapshotObject.snapshotObject().snapshotId());
                    KRaftSnapshotMetadata kRaftSnapshotMetadata = new KRaftSnapshotMetadata(localSnapshotObject.snapshotObject());
                    ensureSnapshotNotCorrupted(apply.get(), localSnapshotObject.snapshotObject().snapshotId());
                    recordDuration(this.metrics.putLatencySensor(), () -> {
                        try {
                            KRaftSnapshotObjectUtils.putObject(this.objectStore, kRaftSnapshotMetadata, snapshotPath.toFile());
                            return null;
                        } catch (IOException e) {
                            throw this.faultHandler.handleFault("PutRemoteObject", e);
                        }
                    }, this.time);
                    this.metrics.updateLastUploadMs(this.time.milliseconds());
                    this.context.remotePutRetries = 0;
                    this.context.remoteObjects.get().add(localSnapshotObject.snapshotObject());
                    this.context.localObjects.remove(localSnapshotObject);
                    this.logger.info("PutRemoteObject success for snapshot: " + kRaftSnapshotMetadata);
                    if (this.deleteEnable.get().booleanValue()) {
                        scheduleDeleteRemoteObject(kRaftSnapshotMetadata);
                    }
                    scheduleUploadLocalObjects();
                    return null;
                } catch (CorruptRecordException e) {
                    this.logger.error("Detected on disk corruption of local KRaft snapshot file, skipping upload.", e);
                    this.metrics.incrementOnDiskCorruption();
                    if (!Objects.nonNull(null)) {
                        return null;
                    }
                    this.context.localObjects.remove(null);
                    return null;
                }
            } catch (E2EChecksumInvalidException | TierObjectStoreRetriableException e2) {
                this.context.remotePutRetries++;
                this.logger.warn("PutRemoteObject for snapshots failed " + this.context.remotePutRetries + " times", e2);
                if (e2 instanceof E2EChecksumInvalidException) {
                    this.metrics.incrementOnNetworkCorruption();
                }
                scheduleUploadLocalObjects();
                return null;
            }
        };
        if (this.context.remotePutRetries == 0) {
            this.executor.submit(createEvent("PutRemoteObject", callable));
        } else {
            this.executor.schedule(createEvent("PutRemoteObject", callable), Math.min(10, this.context.remotePutRetries), TimeUnit.SECONDS);
        }
        this.context.uploadScheduled = true;
    }

    private void ensureSnapshotNotCorrupted(Path path, OffsetAndEpoch offsetAndEpoch) {
        FileRawSnapshotReader open = FileRawSnapshotReader.open(path, offsetAndEpoch);
        Throwable th = null;
        try {
            try {
                RecordsIterator recordsIterator = new RecordsIterator(open.records(), MetadataRecordSerde.INSTANCE, BufferSupplier.create(), 8388608, true);
                while (recordsIterator.hasNext()) {
                    recordsIterator.next();
                }
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    open.close();
                }
            }
            throw th4;
        }
    }

    private void scheduleDeleteRemoteObject(KRaftSnapshotMetadata kRaftSnapshotMetadata) {
        this.executor.schedule(createEvent("DeleteRemoteObject", () -> {
            try {
                recordDuration(this.metrics.deleteLatencySensor(), () -> {
                    KRaftSnapshotObjectUtils.deleteObject(this.objectStore, kRaftSnapshotMetadata, KEY_PREFIX);
                    return null;
                }, this.time);
                this.context.remoteObjects.get().remove(kRaftSnapshotMetadata.snapshotObject());
                this.context.remoteDeleteRetries = 0;
                this.logger.info("DeleteRemoteObject success for snapshot: " + kRaftSnapshotMetadata);
                return null;
            } catch (TierObjectStoreRetriableException e) {
                this.context.remoteDeleteRetries++;
                this.logger.warn("DeleteRemoteObject for snapshots failed " + this.context.remoteDeleteRetries + " times", e);
                return null;
            }
        }), Math.max(0L, TimeUnit.MILLISECONDS.toSeconds((kRaftSnapshotMetadata.snapshotObject().appendTimeStampMs() + this.retentionMs.get().longValue()) - this.time.milliseconds())) + Math.min(MAX_REMOTE_RETRY_SECONDS, this.context.remoteDeleteRetries), TimeUnit.SECONDS);
    }

    private void scheduleDeleteRemoteObjects() {
        this.context.remoteObjects.get().forEach(kRaftSnapshotObject -> {
            scheduleDeleteRemoteObject(new KRaftSnapshotMetadata(kRaftSnapshotObject));
        });
    }

    public void nodeStartedUp(TopicIdPartition topicIdPartition, int i, OptionalInt optionalInt, OptionalLong optionalLong, long j, long j2, SortedSet<OffsetAndEpoch> sortedSet, OptionalInt optionalInt2, Set<Integer> set) {
        this.executor.submit(createEvent("ReplicaStarted", () -> {
            this.logger.info("replica for topic partition {} started with {}", topicIdPartition, sortedSet);
            Iterator it = sortedSet.iterator();
            while (it.hasNext()) {
                OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) it.next();
                maybeReadMaxSnapshotTimestamp(this.topicIdPath, topicIdPartition, offsetAndEpoch).map(l -> {
                    return Boolean.valueOf(this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(topicIdPartition.topicId(), topicIdPartition.partition(), this.clusterId, optionalInt.orElse(-1), l.longValue(), offsetAndEpoch), topicIdPartition)));
                });
            }
            scheduleUploadLocalObjects();
            return null;
        }));
    }

    public void snapshotGenerated(TopicIdPartition topicIdPartition, int i, OptionalInt optionalInt, OptionalLong optionalLong, long j, long j2, OffsetAndEpoch offsetAndEpoch, OptionalInt optionalInt2, Set<Integer> set) {
        this.executor.submit(createEvent("SnapshotGenerated", () -> {
            this.logger.info("topic partition {} generated a snapshot at {}", topicIdPartition, offsetAndEpoch);
            maybeReadMaxSnapshotTimestamp(this.topicIdPath, topicIdPartition, offsetAndEpoch).map(l -> {
                return Boolean.valueOf(this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(topicIdPartition.topicId(), topicIdPartition.partition(), this.clusterId, optionalInt.orElse(-1), l.longValue(), offsetAndEpoch), topicIdPartition)));
            });
            scheduleUploadLocalObjects();
            return null;
        }));
    }

    Optional<Long> maybeReadMaxSnapshotTimestamp(Function<TopicIdPartition, Optional<Path>> function, TopicIdPartition topicIdPartition, OffsetAndEpoch offsetAndEpoch) {
        try {
            return function.apply(topicIdPartition).map(path -> {
                return Long.valueOf(Snapshots.lastContainedLogTimestamp(path, offsetAndEpoch));
            });
        } catch (Exception e) {
            this.logger.error("Could not read snapshot: " + offsetAndEpoch, e);
            this.metrics.updateError(true);
            return Optional.empty();
        }
    }

    private Callable<Void> createEvent(String str, Callable<Void> callable) {
        return () -> {
            this.logger.debug("handling: {}", str);
            try {
                return (Void) callable.call();
            } catch (Throwable th) {
                throw this.faultHandler.handleFault(str, th);
            }
        };
    }

    static <T> T recordDuration(Sensor sensor, Supplier<T> supplier, Time time) {
        long milliseconds = time.milliseconds();
        T t = supplier.get();
        sensor.record(time.milliseconds() - milliseconds);
        return t;
    }

    public static KRaftSnapshotManager create(EventExecutor eventExecutor, TierObjectStore tierObjectStore, KRaftSnapshotMetrics kRaftSnapshotMetrics, LogContext logContext, Function<TopicIdPartition, Optional<Path>> function, String str, int i, Supplier<Boolean> supplier, Supplier<Long> supplier2, Time time) {
        return new KRaftSnapshotManager(eventExecutor, tierObjectStore, kRaftSnapshotMetrics, logContext, function, str, i, supplier, supplier2, time);
    }
}
