package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.class */
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZooKeeperCompletedCheckpointStore.class);
    private final CuratorFramework client;
    private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
    private final int maxNumberOfCheckpointsToRetain;
    private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
    private final Executor executor;

    public ZooKeeperCompletedCheckpointStore(int i, CuratorFramework curatorFramework, String str, RetrievableStateStorageHelper<CompletedCheckpoint> retrievableStateStorageHelper, Executor executor) throws Exception {
        Preconditions.checkArgument(i >= 1, "Must retain at least one checkpoint.");
        Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
        this.maxNumberOfCheckpointsToRetain = i;
        Preconditions.checkNotNull(curatorFramework, "Curator client");
        Preconditions.checkNotNull(str, "Checkpoints path");
        curatorFramework.newNamespaceAwareEnsurePath(str).ensure(curatorFramework.getZookeeperClient());
        this.client = curatorFramework.usingNamespace(curatorFramework.getNamespace() + str);
        this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, retrievableStateStorageHelper);
        this.completedCheckpoints = new ArrayDeque<>(i + 1);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        LOG.info("Initialized in '{}'.", str);
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public boolean requiresExternalizedCheckpoints() {
        return true;
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void recover() throws Exception {
        List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> allSortedByNameAndLock;
        LOG.info("Recovering checkpoints from ZooKeeper.");
        while (true) {
            try {
                allSortedByNameAndLock = this.checkpointsInZooKeeper.getAllSortedByNameAndLock();
                break;
            } catch (ConcurrentModificationException e) {
                LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
            }
        }
        int size = allSortedByNameAndLock.size();
        LOG.info("Found {} checkpoints in ZooKeeper.", Integer.valueOf(size));
        ArrayList arrayList = new ArrayList(size);
        ArrayList arrayList2 = new ArrayList(size);
        do {
            LOG.info("Trying to fetch {} checkpoints from storage.", Integer.valueOf(size));
            arrayList.clear();
            arrayList.addAll(arrayList2);
            arrayList2.clear();
            Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> it = allSortedByNameAndLock.iterator();
            while (it.hasNext()) {
                try {
                    CompletedCheckpoint retrieveCompletedCheckpoint = retrieveCompletedCheckpoint(it.next());
                    if (retrieveCompletedCheckpoint != null) {
                        arrayList2.add(retrieveCompletedCheckpoint);
                    }
                } catch (Exception e2) {
                    LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", (Throwable) e2);
                }
            }
            if (arrayList2.size() == size) {
                break;
            }
        } while (!CompletedCheckpoint.checkpointsMatch(arrayList, arrayList2));
        this.completedCheckpoints.clear();
        this.completedCheckpoints.addAll(arrayList2);
        if (this.completedCheckpoints.isEmpty() && size > 0) {
            throw new FlinkException("Could not read any of the " + size + " checkpoints from storage.");
        }
        if (this.completedCheckpoints.size() != size) {
            LOG.warn("Could only fetch {} of {} checkpoints from storage.", Integer.valueOf(this.completedCheckpoints.size()), Integer.valueOf(size));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
        Preconditions.checkNotNull(completedCheckpoint, "Checkpoint");
        String checkpointIdToPath = checkpointIdToPath(completedCheckpoint.getCheckpointID());
        this.checkpointsInZooKeeper.addAndLock(checkpointIdToPath, completedCheckpoint);
        this.completedCheckpoints.addLast(completedCheckpoint);
        while (this.completedCheckpoints.size() > this.maxNumberOfCheckpointsToRetain) {
            tryRemoveCompletedCheckpoint(this.completedCheckpoints.removeFirst(), (v0) -> {
                v0.discardOnSubsume();
            });
        }
        LOG.debug("Added {} to {}.", completedCheckpoint, checkpointIdToPath);
    }

    private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> throwingConsumer) {
        try {
            if (tryRemove(completedCheckpoint.getCheckpointID())) {
                this.executor.execute(() -> {
                    try {
                        throwingConsumer.accept(completedCheckpoint);
                    } catch (Exception e) {
                        LOG.warn("Could not discard completed checkpoint {}.", Long.valueOf(completedCheckpoint.getCheckpointID()), e);
                    }
                });
            }
        } catch (Exception e) {
            LOG.warn("Failed to subsume the old checkpoint", (Throwable) e);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public CompletedCheckpoint getLatestCheckpoint() {
        if (this.completedCheckpoints.isEmpty()) {
            return null;
        }
        return this.completedCheckpoints.peekLast();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
        return new ArrayList(this.completedCheckpoints);
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public int getNumberOfRetainedCheckpoints() {
        return this.completedCheckpoints.size();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public int getMaxNumberOfRetainedCheckpoints() {
        return this.maxNumberOfCheckpointsToRetain;
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void shutdown(JobStatus jobStatus) throws Exception {
        if (!jobStatus.isGloballyTerminalState()) {
            LOG.info("Suspending");
            this.completedCheckpoints.clear();
            this.checkpointsInZooKeeper.releaseAll();
            return;
        }
        LOG.info("Shutting down");
        Iterator<CompletedCheckpoint> it = this.completedCheckpoints.iterator();
        while (it.hasNext()) {
            tryRemoveCompletedCheckpoint(it.next(), completedCheckpoint -> {
                completedCheckpoint.discardOnShutdown(jobStatus);
            });
        }
        this.completedCheckpoints.clear();
        String str = "/" + this.client.getNamespace();
        LOG.info("Removing {} from ZooKeeper", str);
        ZKPaths.deleteChildren(this.client.getZookeeperClient().getZooKeeper(), str, true);
    }

    private boolean tryRemove(long j) throws Exception {
        return this.checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(j));
    }

    public static String checkpointIdToPath(long j) {
        return String.format("/%019d", Long.valueOf(j));
    }

    public static long pathToCheckpointId(String str) {
        try {
            return Long.parseLong('/' == str.charAt(0) ? str.substring(1) : str);
        } catch (NumberFormatException e) {
            LOG.warn("Could not parse checkpoint id from {}. This indicates that the checkpoint id to path conversion has changed.", str);
            return -1L;
        }
    }

    private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2) throws FlinkException {
        long pathToCheckpointId = pathToCheckpointId(tuple2.f1);
        LOG.info("Trying to retrieve checkpoint {}.", Long.valueOf(pathToCheckpointId));
        try {
            return tuple2.f0.retrieveState();
        } catch (IOException e) {
            throw new FlinkException("Could not retrieve checkpoint " + pathToCheckpointId + " from state handle under " + tuple2.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", e);
        } catch (ClassNotFoundException e2) {
            throw new FlinkException("Could not retrieve checkpoint " + pathToCheckpointId + " from state handle under " + tuple2.f1 + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", e2);
        }
    }
}
