package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator.class */
public class CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final JobID job;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    private final Timer timer;
    private final long checkpointTimeout;
    private final int numSuccessfulCheckpointsToRetain;
    private TimerTask periodicScheduler;
    private ActorRef jobStatusListener;
    private ClassLoader userClassLoader;
    private boolean shutdown;
    private final Object lock = new Object();
    private final AtomicLong checkpointIdCounter = new AtomicLong(1);
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();

    public CheckpointCoordinator(JobID jobID, int i, long j, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, ClassLoader classLoader) {
        if (jobID == null || executionVertexArr == null || executionVertexArr2 == null || executionVertexArr3 == null) {
            throw new NullPointerException();
        }
        if (i < 1) {
            throw new IllegalArgumentException("Must retain at least one successful checkpoint");
        }
        if (j < 1) {
            throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
        }
        this.job = jobID;
        this.numSuccessfulCheckpointsToRetain = i;
        this.checkpointTimeout = j;
        this.tasksToTrigger = executionVertexArr;
        this.tasksToWaitFor = executionVertexArr2;
        this.tasksToCommitTo = executionVertexArr3;
        this.pendingCheckpoints = new LinkedHashMap();
        this.completedCheckpoints = new ArrayDeque<>(i + 1);
        this.recentPendingCheckpoints = new ArrayDeque<>(16);
        this.userClassLoader = classLoader;
        this.timer = new Timer("Checkpoint Timer", true);
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            LOG.info("Stopping checkpoint coordinator for job " + this.job);
            this.timer.cancel();
            if (this.jobStatusListener != null) {
                this.jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
                this.jobStatusListener = null;
            }
            if (this.periodicScheduler != null) {
                this.periodicScheduler.cancel();
                this.periodicScheduler = null;
            }
            Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
            while (it.hasNext()) {
                it.next().discard(this.userClassLoader, true);
            }
            this.pendingCheckpoints.clear();
            Iterator<SuccessfulCheckpoint> it2 = this.completedCheckpoints.iterator();
            while (it2.hasNext()) {
                it2.next().discard(this.userClassLoader);
            }
            this.completedCheckpoints.clear();
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void triggerCheckpoint() {
        triggerCheckpoint(System.currentTimeMillis());
    }

    public boolean triggerCheckpoint(long j) {
        if (this.shutdown) {
            LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
            return false;
        }
        final long andIncrement = this.checkpointIdCounter.getAndIncrement();
        LOG.info("Triggering checkpoint " + andIncrement + " @ " + j);
        try {
            ExecutionAttemptID[] executionAttemptIDArr = new ExecutionAttemptID[this.tasksToTrigger.length];
            for (int i = 0; i < this.tasksToTrigger.length; i++) {
                Execution currentExecutionAttempt = this.tasksToTrigger[i].getCurrentExecutionAttempt();
                if (currentExecutionAttempt == null) {
                    LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", this.tasksToTrigger[i].getSimpleName());
                    return false;
                }
                executionAttemptIDArr[i] = currentExecutionAttempt.getAttemptId();
            }
            HashMap hashMap = new HashMap(this.tasksToWaitFor.length);
            for (ExecutionVertex executionVertex : this.tasksToWaitFor) {
                Execution currentExecutionAttempt2 = executionVertex.getCurrentExecutionAttempt();
                if (currentExecutionAttempt2 == null) {
                    LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", executionVertex.getSimpleName());
                    return false;
                }
                hashMap.put(currentExecutionAttempt2.getAttemptId(), executionVertex);
            }
            final PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(this.job, andIncrement, j, hashMap);
            TimerTask timerTask = new TimerTask() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        synchronized (CheckpointCoordinator.this.lock) {
                            if (!pendingCheckpoint.isDiscarded()) {
                                CheckpointCoordinator.LOG.info("Checkpoint " + andIncrement + " expired before completing.");
                                pendingCheckpoint.discard(CheckpointCoordinator.this.userClassLoader, true);
                                CheckpointCoordinator.this.pendingCheckpoints.remove(Long.valueOf(andIncrement));
                                CheckpointCoordinator.this.rememberRecentCheckpointId(andIncrement);
                            }
                        }
                    } catch (Throwable th) {
                        CheckpointCoordinator.LOG.error("Exception while handling checkpoint timeout", th);
                    }
                }
            };
            synchronized (this.lock) {
                if (this.shutdown) {
                    throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
                }
                this.pendingCheckpoints.put(Long.valueOf(andIncrement), pendingCheckpoint);
                this.timer.schedule(timerTask, this.checkpointTimeout);
            }
            for (int i2 = 0; i2 < this.tasksToTrigger.length; i2++) {
                ExecutionAttemptID executionAttemptID = executionAttemptIDArr[i2];
                this.tasksToTrigger[i2].sendMessageToCurrentExecution(new TriggerCheckpoint(this.job, executionAttemptID, andIncrement, j), executionAttemptID);
            }
            this.numUnsuccessfulCheckpointsTriggers.set(0);
            return true;
        } catch (Throwable th) {
            LOG.warn("Failed to trigger checkpoint (" + this.numUnsuccessfulCheckpointsTriggers.incrementAndGet() + " consecutive failed attempts so far)", th);
            synchronized (this.lock) {
                PendingCheckpoint remove = this.pendingCheckpoints.remove(Long.valueOf(andIncrement));
                if (remove != null && !remove.isDiscarded()) {
                    remove.discard(this.userClassLoader, true);
                }
                return false;
            }
        }
    }

    public void receiveAcknowledgeMessage(AcknowledgeCheckpoint acknowledgeCheckpoint) {
        if (this.shutdown || acknowledgeCheckpoint == null) {
            return;
        }
        if (!this.job.equals(acknowledgeCheckpoint.getJob())) {
            LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", acknowledgeCheckpoint);
            return;
        }
        long checkpointId = acknowledgeCheckpoint.getCheckpointId();
        SuccessfulCheckpoint successfulCheckpoint = null;
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(checkpointId));
            if (pendingCheckpoint == null || pendingCheckpoint.isDiscarded()) {
                if (pendingCheckpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                    LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
                } else {
                    LOG.info("Received message for non-existing checkpoint " + checkpointId);
                }
            } else if (!pendingCheckpoint.acknowledgeTask(acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getState())) {
                LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId + " , task " + acknowledgeCheckpoint.getTaskExecutionId());
            } else if (pendingCheckpoint.isFullyAcknowledged()) {
                LOG.info("Completed checkpoint " + checkpointId);
                successfulCheckpoint = pendingCheckpoint.toCompletedCheckpoint();
                this.completedCheckpoints.addLast(successfulCheckpoint);
                if (this.completedCheckpoints.size() > this.numSuccessfulCheckpointsToRetain) {
                    this.completedCheckpoints.removeFirst().discard(this.userClassLoader);
                }
                this.pendingCheckpoints.remove(Long.valueOf(checkpointId));
                rememberRecentCheckpointId(checkpointId);
                dropSubsumedCheckpoints(successfulCheckpoint.getTimestamp());
            }
            if (successfulCheckpoint != null) {
                long timestamp = successfulCheckpoint.getTimestamp();
                for (ExecutionVertex executionVertex : this.tasksToCommitTo) {
                    Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
                    if (currentExecutionAttempt != null) {
                        executionVertex.sendMessageToCurrentExecution(new ConfirmCheckpoint(this.job, currentExecutionAttempt.getAttemptId(), checkpointId, timestamp), currentExecutionAttempt.getAttemptId());
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rememberRecentCheckpointId(long j) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(Long.valueOf(j));
    }

    private void dropSubsumedCheckpoints(long j) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> it = this.pendingCheckpoints.entrySet().iterator();
        while (it.hasNext()) {
            PendingCheckpoint value = it.next().getValue();
            if (value.getCheckpointTimestamp() < j) {
                rememberRecentCheckpointId(value.getCheckpointId());
                value.discard(this.userClassLoader, true);
                it.remove();
            }
        }
    }

    public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> map, boolean z, boolean z2) throws Exception {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            if (this.completedCheckpoints.isEmpty()) {
                if (z) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                return;
            }
            SuccessfulCheckpoint last = this.completedCheckpoints.getLast();
            if (z2) {
                HashMap hashMap = new HashMap();
                for (StateForTask stateForTask : last.getStates()) {
                    ExecutionJobVertex executionJobVertex = map.get(stateForTask.getOperatorId());
                    executionJobVertex.getTaskVertices()[stateForTask.getSubtask()].getCurrentExecutionAttempt().setInitialState(stateForTask.getState());
                    Integer num = (Integer) hashMap.get(executionJobVertex);
                    if (num != null) {
                        hashMap.put(executionJobVertex, Integer.valueOf(num.intValue() + 1));
                    } else {
                        hashMap.put(executionJobVertex, 1);
                    }
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) entry.getKey();
                    if (((Integer) entry.getValue()).intValue() != executionJobVertex2.getParallelism()) {
                        throw new IllegalStateException("The checkpoint contained state only for a subset of tasks for vertex " + executionJobVertex2);
                    }
                }
            } else {
                for (StateForTask stateForTask2 : last.getStates()) {
                    map.get(stateForTask2.getOperatorId()).getTaskVertices()[stateForTask2.getSubtask()].getCurrentExecutionAttempt().setInitialState(stateForTask2.getState());
                }
            }
        }
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    public int getNumberOfRetainedSuccessfulCheckpoints() {
        return this.completedCheckpoints.size();
    }

    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        HashMap hashMap;
        synchronized (this.lock) {
            hashMap = new HashMap(this.pendingCheckpoints);
        }
        return hashMap;
    }

    public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
        ArrayList arrayList;
        synchronized (this.lock) {
            arrayList = new ArrayList(this.completedCheckpoints);
        }
        return arrayList;
    }

    public void startPeriodicCheckpointScheduler(long j) {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            stopPeriodicCheckpointScheduler();
            this.periodicScheduler = new TimerTask() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        CheckpointCoordinator.this.triggerCheckpoint();
                    } catch (Exception e) {
                        CheckpointCoordinator.LOG.error("Exception while triggering checkpoint", (Throwable) e);
                    }
                }
            };
            this.timer.scheduleAtFixedRate(this.periodicScheduler, j, j);
        }
    }

    public void stopPeriodicCheckpointScheduler() {
        synchronized (this.lock) {
            if (this.periodicScheduler != null) {
                this.periodicScheduler.cancel();
                this.periodicScheduler = null;
            }
        }
    }

    public ActorRef createJobStatusListener(ActorSystem actorSystem, long j) {
        ActorRef actorRef;
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                this.jobStatusListener = actorSystem.actorOf(Props.create((Class<?>) CheckpointCoordinatorDeActivator.class, this, Long.valueOf(j)));
            }
            actorRef = this.jobStatusListener;
        }
        return actorRef;
    }
}
