/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.DiscardRecordedStateObject;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.TriFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void testSharedStateNotDiscaredOnAbort() throws Exception {
        JobVertexID v1 = new JobVertexID();
        JobVertexID v2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(v1).addJobVertex(v2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        coordinator.startCheckpointScheduler();
        CompletableFuture cpFuture = coordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        cpFuture.getNow(null);
        TestingStreamStateHandle metaState = this.handle();
        TestingStreamStateHandle privateState = this.handle();
        TestingStreamStateHandle sharedState = this.handle();
        this.ackCheckpoint(1L, coordinator, v1, graph, metaState, privateState, sharedState);
        this.declineCheckpoint(1L, coordinator, v2, graph);
        Assert.assertTrue((boolean)privateState.isDisposed());
        Assert.assertTrue((boolean)metaState.isDisposed());
        Assert.assertFalse((boolean)sharedState.isDisposed());
        cpFuture = coordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        cpFuture.getNow(null);
        this.ackCheckpoint(2L, coordinator, v1, graph, this.handle(), this.handle(), this.handle());
        this.ackCheckpoint(2L, coordinator, v2, graph, this.handle(), this.handle(), this.handle());
        cpFuture.get();
        Assert.assertTrue((boolean)sharedState.isDisposed());
    }

    @Test
    public void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
        this.testReportStatsAfterFailure(1L, (coordinator, execution, metrics) -> {
            coordinator.reportStats(1L, execution.getAttemptId(), metrics);
            return null;
        });
    }

    @Test
    public void testCheckpointStatsUpdatedAfterFailure() throws Exception {
        this.testReportStatsAfterFailure(1L, (coordinator, execution, metrics) -> coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(execution.getVertex().getJobId(), execution.getAttemptId(), 1L, metrics, new TaskStateSnapshot()), TASK_MANAGER_LOCATION_INFO));
    }

    private void testReportStatsAfterFailure(long checkpointId, TriFunctionWithException<CheckpointCoordinator, Execution, CheckpointMetrics, ?, CheckpointException> reportFn) throws Exception {
        JobVertexID decliningVertexID = new JobVertexID();
        JobVertexID lateReportVertexID = new JobVertexID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(decliningVertexID).addJobVertex(lateReportVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex decliningVertex = executionGraph.getJobVertex(decliningVertexID).getTaskVertices()[0];
        ExecutionVertex lateReportVertex = executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(statsTracker).build(executionGraph);
        CompletableFuture result = coordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Preconditions.checkState((coordinator.getNumberOfPendingCheckpoints() == 1 ? 1 : 0) != 0, (String)"wrong number of pending checkpoints: %s", (Object[])new Object[]{coordinator.getNumberOfPendingCheckpoints()});
        if (result.isDone()) {
            result.get();
        }
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(executionGraph.getJobID(), decliningVertex.getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
        CheckpointMetrics lateReportedMetrics = new CheckpointMetricsBuilder().setTotalBytesPersisted(18L).setBytesPersistedOfThisCheckpoint(18L).setBytesProcessedDuringAlignment(19L).setAsyncDurationMillis(20L).setAlignmentDurationNanos(123000000L).setCheckpointStartDelayNanos(567000000L).build();
        reportFn.apply((Object)coordinator, (Object)lateReportVertex.getCurrentExecutionAttempt(), (Object)lateReportedMetrics);
        this.assertStatsEqual(checkpointId, lateReportVertex.getJobvertexId(), 0, lateReportedMetrics, statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId));
    }

    private boolean hasNoSubState(OperatorState s) {
        return s.getNumberCollectedStates() == 0;
    }

    private void assertStatsEqual(long checkpointId, JobVertexID jobVertexID, int subtasIdx, CheckpointMetrics expected, AbstractCheckpointStats actual) {
        Assert.assertEquals((long)checkpointId, (long)actual.getCheckpointId());
        Assert.assertEquals((Object)CheckpointStatsStatus.FAILED, (Object)actual.getStatus());
        Assert.assertEquals((long)0L, (long)actual.getNumberOfAcknowledgedSubtasks());
        CheckpointCoordinatorTest.assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
    }

    public static void assertStatsMetrics(JobVertexID jobVertexID, int subtasIdx, CheckpointMetrics expected, AbstractCheckpointStats actual) {
        Assert.assertEquals((long)expected.getTotalBytesPersisted(), (long)actual.getStateSize());
        SubtaskStateStats taskStats = actual.getAllTaskStateStats().stream().filter(s -> s.getJobVertexId().equals((Object)jobVertexID)).findAny().get().getSubtaskStats()[subtasIdx];
        Assert.assertEquals((long)(expected.getAlignmentDurationNanos() / 1000000L), (long)taskStats.getAlignmentDuration());
        Assert.assertEquals((Object)expected.getUnalignedCheckpoint(), (Object)taskStats.getUnalignedCheckpoint());
        Assert.assertEquals((long)expected.getAsyncDurationMillis(), (long)taskStats.getAsyncCheckpointDuration());
        Assert.assertEquals((long)(expected.getAlignmentDurationNanos() / 1000000L), (long)taskStats.getAlignmentDuration());
        Assert.assertEquals((long)(expected.getCheckpointStartDelayNanos() / 1000000L), (long)taskStats.getCheckpointStartDelay());
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testScheduleTriggerRequestDuringShutdown() throws Exception {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator((ScheduledExecutor)new ScheduledExecutorServiceAdapter(executor));
        coordinator.shutdown();
        executor.shutdownNow();
        coordinator.scheduleTriggerRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMinCheckpointPause() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator coordinator = null;
        try {
            int pause = 1000;
            JobVertexID jobVertexId = new JobVertexID();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId).setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService())).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionVertex vertex = graph.getJobVertex(jobVertexId).getTaskVertices()[0];
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)new ScheduledExecutorServiceAdapter(executorService)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval((long)pause).setCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(1).setMinPauseBetweenCheckpoints((long)pause).build()).build(graph);
            coordinator.startCheckpointScheduler();
            coordinator.triggerCheckpoint(true);
            coordinator.triggerCheckpoint(true);
            while (coordinator.getPendingCheckpoints().values().stream().noneMatch(pc -> pc.getCheckpointStorageLocation() != null)) {
                Thread.sleep(10L);
            }
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
            Thread.sleep(pause / 2);
            Assert.assertEquals((long)0L, (long)coordinator.getNumberOfPendingCheckpoints());
            while (coordinator.getNumberOfPendingCheckpoints() == 0) {
                Thread.sleep(1L);
            }
        }
        finally {
            if (coordinator != null) {
                coordinator.shutdown();
            }
            executorService.shutdownNow();
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID(), false).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Arrays.stream(graph.getJobVertex(jobVertexID1).getTaskVertices()).forEach(task -> task.getCurrentExecutionAttempt().markFinished());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, 3, 256).addJobVertex(jobVertexID2, 3, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        jobVertex1.getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        jobVertex1.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).setCheckpointStatsTracker(statsTracker).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)checkpointFuture.isDone());
        Assert.assertFalse((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        AbstractCheckpointStats checkpointStats = statsTracker.createSnapshot().getHistory().getCheckpointById(pendingCheckpoint.getCheckpointID());
        Assert.assertEquals((long)3L, (long)checkpointStats.getNumberOfAcknowledgedSubtasks());
        for (ExecutionVertex task : Arrays.asList(jobVertex1.getTaskVertices()[0], jobVertex1.getTaskVertices()[1], jobVertex2.getTaskVertices()[1])) {
            Assert.assertNotNull((Object)checkpointStats.getTaskStateStats(task.getJobvertexId()).getSubtaskStats()[task.getParallelSubtaskIndex()]);
        }
    }

    @Test
    public void testTasksFinishDuringTriggering() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().setTransitToRunning(false).addJobVertex(jobVertexID1, 1, 256).addJobVertex(jobVertexID2, 1, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        final ExecutionVertex taskVertex = jobVertex1.getTaskVertices()[0];
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        ExecutionVertex taskVertex2 = jobVertex2.getTaskVertices()[0];
        final AtomicBoolean checkpointAborted = new AtomicBoolean(false);
        TestingLogicalSlot slot1 = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
                taskVertex.getCurrentExecutionAttempt().markFinished();
                return FutureUtils.completedExceptionally((Throwable)new RpcException(""));
            }
        }).createTestingLogicalSlot();
        TestingLogicalSlot slot2 = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long latestCompletedCheckpointId, long timestamp) {
                checkpointAborted.set(true);
            }
        }).createTestingLogicalSlot();
        ExecutionGraphTestUtils.setVertexResource(taskVertex, slot1);
        taskVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        ExecutionGraphTestUtils.setVertexResource(taskVertex2, slot2);
        taskVertex2.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertTrue((boolean)checkpointAborted.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            CompletableFuture checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkPointFuture);
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse((boolean)checkpoint.isDisposed());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            Assert.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Exceeded checkpoint failure tolerance number!");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String expectedErrorMessage = "Expected Error Message";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Expected Error Message");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.IO_EXCEPTION));
            Assert.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Expected Error Message");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
        block2: {
            CheckpointCoordinator checkpointCoordinator = this.setupCheckpointCoordinatorWithInactiveTasks((CheckpointStorage)new IOExceptionCheckpointStorage());
            CompletableFuture onCompletionPromise = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                onCompletionPromise.get();
                Assert.fail((String)"should not trigger periodic checkpoint after IOException occurred.");
            }
            catch (Exception e) {
                Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (checkpointExceptionOptional.isPresent() && ((CheckpointException)checkpointExceptionOptional.get()).getCheckpointFailureReason() == CheckpointFailureReason.IO_EXCEPTION) break block2;
                throw e;
            }
        }
    }

    @Test
    public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
        TestFailJobCallback failureCallback = new TestFailJobCallback();
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStatsTracker(statsTracker).setFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failureCallback)).setCheckpointStorage((CheckpointStorage)new IOExceptionCheckpointStorage()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.IO_EXCEPTION);
        Assert.assertEquals((long)1L, (long)failureCallback.getInvokeCounter());
        Assert.assertNotNull((Object)statsTracker.getPendingCheckpointStats(1L));
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStorage((CheckpointStorage)new IOExceptionCheckpointStorage()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Arrays.stream(graph.getJobVertex(jobVertexID1).getTaskVertices()).forEach(task -> task.getCurrentExecutionAttempt().markFinished());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
            Assert.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Exceeded checkpoint failure tolerance number!");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    public void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_DECLINED);
    }

    @Test
    public void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
    }

    private void testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason checkpointFailureReason) throws Exception {
        CheckpointException checkpointException = new CheckpointException(checkpointFailureReason);
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        TestFailJobCallback failJobCallback = new TestFailJobCallback();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failJobCallback)).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assert.assertNotNull((Object)checkpoint);
        Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)checkpoint.getJobId());
        Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint = gateway.getOnlyTriggeredCheckpoint(vertex.getCurrentExecutionAttempt().getAttemptId());
            Assert.assertEquals((long)checkpointId, (long)triggeredCheckpoint.checkpointId);
            Assert.assertEquals((long)checkpoint.getCheckpointTimestamp(), (long)triggeredCheckpoint.timestamp);
            Assert.assertEquals((Object)CheckpointOptions.forCheckpointWithDefaultLocation(), (Object)triggeredCheckpoint.checkpointOptions);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)checkpoint.isDisposed());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID2, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)checkpoint.isDisposed());
        Assert.assertEquals((long)1L, (long)failJobCallback.getInvokeCounter());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)2L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
        long checkpoint1Id = (Long)it.next().getKey();
        long checkpoint2Id = (Long)it.next().getKey();
        PendingCheckpoint checkpoint1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
        PendingCheckpoint checkpoint2 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
        Assert.assertNotNull((Object)checkpoint1);
        Assert.assertEquals((long)checkpoint1Id, (long)checkpoint1.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)checkpoint1.getJobId());
        Assert.assertEquals((long)2L, (long)checkpoint1.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint1.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint1.getOperatorStates().size());
        Assert.assertFalse((boolean)checkpoint1.isDisposed());
        Assert.assertFalse((boolean)checkpoint1.areTasksFullyAcknowledged());
        Assert.assertNotNull((Object)checkpoint2);
        Assert.assertEquals((long)checkpoint2Id, (long)checkpoint2.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)checkpoint2.getJobId());
        Assert.assertEquals((long)2L, (long)checkpoint2.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint2.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint2.getOperatorStates().size());
        Assert.assertFalse((boolean)checkpoint2.isDisposed());
        Assert.assertFalse((boolean)checkpoint2.areTasksFullyAcknowledged());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints = gateway.getTriggeredCheckpoints(vertex.getCurrentExecutionAttempt().getAttemptId());
            Assert.assertEquals((long)2L, (long)triggeredCheckpoints.size());
            Assert.assertEquals((long)checkpoint1Id, (long)triggeredCheckpoints.get((int)0).checkpointId);
            Assert.assertEquals((long)checkpoint2Id, (long)triggeredCheckpoints.get((int)1).checkpointId);
        }
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            Assert.assertEquals((long)checkpoint1Id, (long)gateway.getOnlyNotifiedAbortedCheckpoint((ExecutionAttemptID)vertex.getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        Assert.assertTrue((boolean)checkpoint1.isDisposed());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpointNew = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
        Assert.assertEquals((long)checkpoint2Id, (long)checkpointIdNew);
        Assert.assertNotNull((Object)checkpointNew);
        Assert.assertEquals((long)checkpointIdNew, (long)checkpointNew.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)checkpointNew.getJobId());
        Assert.assertEquals((long)2L, (long)checkpointNew.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpointNew.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpointNew.getOperatorStates().size());
        Assert.assertFalse((boolean)checkpointNew.isDisposed());
        Assert.assertFalse((boolean)checkpointNew.areTasksFullyAcknowledged());
        Assert.assertNotEquals((long)checkpoint1.getCheckpointId(), (long)checkpointNew.getCheckpointId());
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID2, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)checkpoint1.isDisposed());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            Assert.assertEquals((long)1L, (long)gateway.getNotifiedAbortedCheckpoints(vertex.getCurrentExecutionAttempt().getAttemptId()).size());
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assert.assertNotNull((Object)checkpoint);
        Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)checkpoint.getJobId());
        Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot(Collections.singletonMap(opID1, subtaskState1));
        TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot(Collections.singletonMap(opID2, subtaskState2));
        AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)checkpoint.isDisposed());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId, (long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)graph.getJobID(), (Object)success.getJobId());
        Assert.assertEquals((long)checkpoint.getCheckpointId(), (long)success.getCheckpointID());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        gateway.resetCount();
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletedCheckpoint successNew = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)graph.getJobID(), (Object)successNew.getJobId());
        Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
        Assert.assertEquals((long)2L, (long)successNew.getOperatorStates().size());
        Assert.assertTrue((boolean)successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointIdNew, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
            Assert.assertEquals((long)checkpointIdNew, (long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testMultipleConcurrentCheckpoints() throws Exception {
        ExecutionAttemptID attemptId;
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex vertex3 = graph.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID3 = vertex3.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId1 = pending1.getCheckpointId();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId2 = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId1, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId2).checkpointId);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        gateway.resetCount();
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
        PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
        PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
        PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
        long checkpointId2 = pending2.getCheckpointId();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId2, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue((boolean)pending1.isDisposed());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId1, (long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        gateway.resetCount();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue((boolean)pending2.isDisposed());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId2, (long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        List scs = checkpointCoordinator.getSuccessfulCheckpoints();
        CompletedCheckpoint sc1 = (CompletedCheckpoint)scs.get(0);
        Assert.assertEquals((long)checkpointId1, (long)sc1.getCheckpointID());
        Assert.assertEquals((Object)graph.getJobID(), (Object)sc1.getJobId());
        Assert.assertEquals((long)3L, (long)sc1.getOperatorStates().size());
        Assert.assertTrue((boolean)sc1.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        CompletedCheckpoint sc2 = (CompletedCheckpoint)scs.get(1);
        Assert.assertEquals((long)checkpointId2, (long)sc2.getCheckpointID());
        Assert.assertEquals((Object)graph.getJobID(), (Object)sc2.getJobId());
        Assert.assertEquals((long)3L, (long)sc2.getOperatorStates().size());
        Assert.assertTrue((boolean)sc2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex vertex3 = graph.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID3 = vertex3.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(10);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId1 = pending1.getCheckpointId();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId1, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID3 = ((OperatorIDPair)vertex3.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStates11 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates12 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates13 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState11 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState12 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState13 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStates11.putSubtaskStateByOperatorID(opID1, subtaskState11);
        taskOperatorSubtaskStates12.putSubtaskStateByOperatorID(opID2, subtaskState12);
        taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
        gateway.resetCount();
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
        PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
        PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
        PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
        long checkpointId2 = pending2.getCheckpointId();
        TaskStateSnapshot taskOperatorSubtaskStates21 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates22 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates23 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState21 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState22 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState23 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStates21.putSubtaskStateByOperatorID(opID1, subtaskState21);
        taskOperatorSubtaskStates22.putSubtaskStateByOperatorID(opID2, subtaskState22);
        taskOperatorSubtaskStates23.putSubtaskStateByOperatorID(opID3, subtaskState23);
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId2, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)pending1.isDisposed());
        Assert.assertTrue((boolean)pending2.isDisposed());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState11, (VerificationMode)Mockito.times((int)1))).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState12, (VerificationMode)Mockito.times((int)1))).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState21, (VerificationMode)Mockito.never())).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState22, (VerificationMode)Mockito.never())).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState23, (VerificationMode)Mockito.never())).discardState();
        List scs = checkpointCoordinator.getSuccessfulCheckpoints();
        CompletedCheckpoint success = (CompletedCheckpoint)scs.get(0);
        Assert.assertEquals((long)checkpointId2, (long)success.getCheckpointID());
        Assert.assertEquals((Object)graph.getJobID(), (Object)success.getJobId());
        Assert.assertEquals((long)3L, (long)success.getOperatorStates().size());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId2, (long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState13, (VerificationMode)Mockito.times((int)1))).discardState();
        checkpointCoordinator.shutdown();
        completedCheckpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState21, (VerificationMode)Mockito.times((int)1))).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState22, (VerificationMode)Mockito.times((int)1))).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState23, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testCheckpointTimeoutIsolated() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        Assert.assertFalse((boolean)checkpoint.isDisposed());
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        this.manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        Assert.assertTrue((String)"Checkpoint was not canceled by the timeout", (boolean)checkpoint.isDisposed());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).discardState();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)0L, (long)gateway.getNotifiedCompletedCheckpoints(attemptId).size());
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID opIDtrigger = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStatesTrigger = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskStateTrigger = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot differentJobSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot triggerSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)triggerSubtaskState, (VerificationMode)Mockito.never())).discardState();
        Mockito.reset((Object[])new OperatorSubtaskState[]{subtaskStateTrigger});
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)pendingCheckpoint.isDisposed());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot ackSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)ackSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new TaskStateSnapshot[]{differentJobSubtaskState});
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState2, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        this.testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        this.testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        this.testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(statsTracker).build(graph);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)savepointFuture.isDone());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint pending = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assert.assertNotNull((Object)pending);
        Assert.assertEquals((long)checkpointId, (long)pending.getCheckpointId());
        Assert.assertEquals((Object)graph.getJobID(), (Object)pending.getJobId());
        Assert.assertEquals((long)2L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getOperatorStates().size());
        Assert.assertFalse((boolean)pending.isDisposed());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot(Collections.singletonMap(opID1, subtaskState1));
        TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot(Collections.singletonMap(opID2, subtaskState2));
        AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)1L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse((boolean)pending.isDisposed());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse((boolean)pending.isDisposed());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)pending.isDisposed());
        Assert.assertNotNull(savepointFuture.get());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
            Assertions.assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
        }
        CompletedCheckpoint success = (CompletedCheckpoint)savepointFuture.get();
        Assert.assertEquals((Object)graph.getJobID(), (Object)success.getJobId());
        Assert.assertEquals((long)pending.getCheckpointId(), (long)success.getCheckpointID());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        AbstractCheckpointStats actualStats = statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
        Assert.assertEquals((long)checkpointId, (long)actualStats.getCheckpointId());
        Assert.assertEquals((Object)CheckpointStatsStatus.COMPLETED, (Object)actualStats.getStatus());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = (CheckpointCoordinator)Mockito.spy((Object)new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCheckpointIDCounter((CheckpointIDCounter)counter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph));
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId1 = counter.getLast();
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        long checkpointId2 = counter.getLast();
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)checkpointId2), ArgumentMatchers.anyLong(), Mockito.eq((long)-1L));
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDisposed());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        CompletableFuture checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture3);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId2 = counter.getLast();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)savepointFuture2);
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)savepointId2), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDisposed());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        Assert.assertNotNull(savepointFuture2.get());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)savepointId1), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertNotNull(savepointFuture1.get());
        CompletableFuture checkpointFuture4 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture4);
        long checkpointId4 = counter.getLast();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId4), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId4), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)checkpointId4), ArgumentMatchers.anyLong(), Mockito.eq((long)checkpointId2));
    }

    private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
        try {
            JobVertexID jobVertexID1 = new JobVertexID();
            CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
            ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(maxConcurrentAttempts).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
            checkpointCoordinator.startCheckpointScheduler();
            for (int i = 0; i < maxConcurrentAttempts; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Assert.assertEquals((long)maxConcurrentAttempts, (long)gateway.getTriggeredCheckpoints(attemptID1).size());
            Assert.assertEquals((long)0L, (long)gateway.getNotifiedCompletedCheckpoints(attemptID1).size());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
            Collection periodicScheduledTasks = this.manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask();
            Assert.assertEquals((long)1L, (long)periodicScheduledTasks.size());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)gateway.getTriggeredCheckpoints(attemptID1).size());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)gateway.getTriggeredCheckpoints(attemptID1).size());
            checkpointCoordinator.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
        int maxConcurrentAttempts = 2;
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        checkpointCoordinator.startCheckpointScheduler();
        do {
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
        Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 2L), TASK_MANAGER_LOCATION_INFO);
        do {
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
        Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.setupCheckpointCoordinatorWithInactiveTasks((CheckpointStorage)new MemoryStateBackend());
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((checkpointCoordinator.getNumberOfPendingCheckpoints() > 0 ? 1 : 0) != 0);
    }

    private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(CheckpointStorage checkpointStorage) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
        CheckpointIDCounterWithOwner checkpointIDCounter = new CheckpointIDCounterWithOwner();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setCheckpointStorage(checkpointStorage).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).build(graph);
        checkpointIDCounter.setOwner(checkpointCoordinator);
        checkpointCoordinator.startCheckpointScheduler();
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        return checkpointCoordinator;
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        int numSavepoints = 5;
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        ArrayList<CompletableFuture> savepointFutures = new ArrayList<CompletableFuture>();
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < numSavepoints; ++i) {
            savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL));
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertFalse((boolean)savepointFuture.isDone());
        }
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long checkpointId = checkpointIDCounter.getLast();
        int i = 0;
        while (i < numSavepoints) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
            ++i;
            --checkpointId;
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertNotNull(savepointFuture.get());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMinPauseBetweenCheckpoints(100000000L).setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint0.isDone());
        CompletableFuture savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint1.isDone());
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
            CheckpointProperties props = checkpoint.getProps();
            CheckpointProperties expected = CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
            Assert.assertEquals((Object)expected, (Object)props);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        this.testCreateKeyGroupPartitions(1, 1);
        this.testCreateKeyGroupPartitions(13, 1);
        this.testCreateKeyGroupPartitions(13, 2);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 1);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 13);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, Short.MAX_VALUE);
        Random r = new Random(1234L);
        for (int k = 0; k < 1000; ++k) {
            int maxParallelism = 1 + r.nextInt(32766);
            int parallelism = 1 + r.nextInt(maxParallelism);
            this.testCreateKeyGroupPartitions(maxParallelism, parallelism);
        }
    }

    private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
        List ranges = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)parallelism);
        for (int i = 0; i < maxParallelism; ++i) {
            KeyGroupRange range = (KeyGroupRange)ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism, (int)i));
            if (range.contains(i)) continue;
            Assert.fail((String)("Could not find expected key-group " + i + " in range " + range));
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random r = new Random(42L);
        for (int run = 0; run < 10000; ++run) {
            int oldParallelism = 1 + r.nextInt(9);
            int newParallelism = 1 + r.nextInt(9);
            int numNamedStates = 1 + r.nextInt(9);
            int maxPartitionsPerState = 1 + r.nextInt(9);
            this.doTestPartitionableStateRepartitioning(r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
        }
    }

    /*
     * WARNING - void declaration
     */
    private void doTestPartitionableStateRepartitioning(Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
        ArrayList<List<OperatorStreamStateHandle>> previousParallelOpInstanceStates = new ArrayList<List<OperatorStreamStateHandle>>(oldParallelism);
        for (int i = 0; i < oldParallelism; ++i) {
            void var11_16;
            Path fakePath = new Path("/fake-" + i);
            HashMap<String, OperatorStateHandle.StateMetaInfo> namedStatesToOffsets = new HashMap<String, OperatorStateHandle.StateMetaInfo>();
            int off = 0;
            boolean bl = false;
            while (var11_16 < numNamedStates - 1) {
                Object offs = new long[1 + r.nextInt(maxPartitionsPerState)];
                for (int o = 0; o < ((Object)offs).length; ++o) {
                    offs[o] = (long)off;
                    ++off;
                }
                OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                namedStatesToOffsets.put("State-" + (int)var11_16, new OperatorStateHandle.StateMetaInfo((long[])offs, mode));
                ++var11_16;
            }
            if (numNamedStates % 2 == 0) {
                long[] lArray = new long[]{off + 1, off + 2, off + 3, off + 4};
                namedStatesToOffsets.put("State-" + (numNamedStates - 1), new OperatorStateHandle.StateMetaInfo(lArray, OperatorStateHandle.Mode.BROADCAST));
            }
            previousParallelOpInstanceStates.add(Collections.singletonList(new OperatorStreamStateHandle(namedStatesToOffsets, (StreamStateHandle)new FileStateHandle(fakePath, -1L))));
        }
        HashMap expected = new HashMap();
        int taskIndex = 0;
        int expectedTotalPartitions = 0;
        for (List list : previousParallelOpInstanceStates) {
            Assert.assertEquals((long)1L, (long)list.size());
            for (OperatorStateHandle psh : list) {
                Map offsMap = psh.getStateNameToPartitionOffsets();
                HashMap offsMapWithList = new HashMap(offsMap.size());
                for (Map.Entry e : offsMap.entrySet()) {
                    int replication;
                    long[] offs = ((OperatorStateHandle.StateMetaInfo)e.getValue()).getOffsets();
                    switch (((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode()) {
                        case UNION: {
                            replication = newParallelism;
                            break;
                        }
                        case BROADCAST: {
                            int extra = taskIndex < newParallelism % oldParallelism ? 1 : 0;
                            replication = newParallelism / oldParallelism + extra;
                            break;
                        }
                        case SPLIT_DISTRIBUTE: {
                            replication = 1;
                            break;
                        }
                        default: {
                            throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode());
                        }
                    }
                    if (replication <= 0) continue;
                    expectedTotalPartitions += replication * offs.length;
                    ArrayList<Long> offsList = new ArrayList<Long>(offs.length);
                    for (Object off : (Object)offs) {
                        for (int p = 0; p < replication; ++p) {
                            offsList.add((long)off);
                        }
                    }
                    offsMapWithList.put(e.getKey(), offsList);
                }
                if (!offsMapWithList.isEmpty()) {
                    expected.put(psh.getDelegateStateHandle(), offsMapWithList);
                }
                ++taskIndex;
            }
        }
        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        List list = repartitioner.repartitionState(previousParallelOpInstanceStates, oldParallelism, newParallelism);
        HashMap actual = new HashMap();
        int minCount = Integer.MAX_VALUE;
        int maxCount = 0;
        int actualTotalPartitions = 0;
        for (int p = 0; p < newParallelism; ++p) {
            int partitionCount = 0;
            Collection pshc = (Collection)list.get(p);
            for (OperatorStateHandle sh : pshc) {
                for (Map.Entry namedState : sh.getStateNameToPartitionOffsets().entrySet()) {
                    long[] add;
                    ArrayList<Long> actualOffs;
                    HashMap stateToOffsets = (HashMap)actual.get(sh.getDelegateStateHandle());
                    if (stateToOffsets == null) {
                        stateToOffsets = new HashMap();
                        actual.put(sh.getDelegateStateHandle(), stateToOffsets);
                    }
                    if ((actualOffs = (ArrayList<Long>)stateToOffsets.get(namedState.getKey())) == null) {
                        actualOffs = new ArrayList<Long>();
                        stateToOffsets.put(namedState.getKey(), actualOffs);
                    }
                    for (long l : add = ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets()) {
                        actualOffs.add(l);
                    }
                    partitionCount += ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets().length;
                }
            }
            minCount = Math.min(minCount, partitionCount);
            maxCount = Math.max(maxCount, partitionCount);
            actualTotalPartitions += partitionCount;
        }
        for (Map v : actual.values()) {
            for (List l : v.values()) {
                Collections.sort(l);
            }
        }
        if (oldParallelism != newParallelism) {
            int maxLoadDiff = maxCount - minCount;
            Assert.assertTrue((String)("Difference in partition load is > 1 : " + maxLoadDiff), (maxLoadDiff <= 1 ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)expectedTotalPartitions, (long)actualTotalPartitions);
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(tracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Mockito.when((Object)tracker.reportPendingCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointProperties)Matchers.any(CheckpointProperties.class), (Map)Matchers.any(Map.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportPendingCheckpoint(Mockito.eq((long)1L), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointProperties)Mockito.eq((Object)CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)), (Map)Matchers.any());
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)store).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(tracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        store.addCheckpointAndSubsumeOldestOne(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null), new CheckpointsCleaner(), () -> {});
        Assert.assertTrue((boolean)checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportRestoredCheckpoint((RestoredCheckpointStats)Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        for (RestoreMode restoreMode : RestoreMode.values()) {
            JobVertexID jobVertexID1 = new JobVertexID();
            int parallelism1 = 2;
            int maxParallelism1 = 4;
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
            List checkpoints = Collections.emptyList();
            SharedStateRegistry firstInstance = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), checkpoints, restoreMode);
            EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(10, checkpoints, firstInstance);
            CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder coordinatorBuilder = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor);
            CheckpointCoordinator coordinator = coordinatorBuilder.setCompletedCheckpointStore((CompletedCheckpointStore)store).build(graph);
            int numCheckpoints = 3;
            List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
            for (int i = 0; i < 3; ++i) {
                this.performIncrementalCheckpoint(graph.getJobID(), coordinator, jobVertex1, keyGroupPartitions1, i);
            }
            List completedCheckpoints = coordinator.getSuccessfulCheckpoints();
            Assert.assertEquals((long)3L, (long)completedCheckpoints.size());
            int sharedHandleCount = 0;
            ArrayList<List<IncrementalKeyedStateHandle.HandleAndLocalPath>> sharedHandlesByCheckpoint = new ArrayList<List<IncrementalKeyedStateHandle.HandleAndLocalPath>>(3);
            for (int i = 0; i < 3; ++i) {
                sharedHandlesByCheckpoint.add(new ArrayList(2));
            }
            int cp = 0;
            for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
                for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                    for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                        for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                            ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)Mockito.times((int)1))).registerSharedStates(firstInstance, completedCheckpoint.getCheckpointID());
                            IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)keyedStateHandle;
                            ((List)sharedHandlesByCheckpoint.get(cp)).addAll(incrementalKeyedStateHandle.getSharedState());
                            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getSharedState()) {
                                StreamStateHandle streamStateHandle = handleAndLocalPath.getHandle();
                                Assertions.assertThat((boolean)(streamStateHandle instanceof PlaceholderStreamStateHandle)).isFalse();
                                DiscardRecordedStateObject.verifyDiscard((StateObject)streamStateHandle, TernaryBoolean.FALSE);
                                ++sharedHandleCount;
                            }
                            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getPrivateState()) {
                                DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), TernaryBoolean.FALSE);
                            }
                            ((StreamStateHandle)Mockito.verify((Object)incrementalKeyedStateHandle.getMetaStateHandle(), (VerificationMode)Mockito.never())).discardState();
                        }
                        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.never())).discardState();
                    }
                }
                ++cp;
            }
            Assert.assertEquals((long)10L, (long)sharedHandleCount);
            store.removeOldestCheckpoint();
            for (List list : sharedHandlesByCheckpoint) {
                for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : list) {
                    DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), TernaryBoolean.FALSE);
                }
            }
            store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
            HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
            tasks.add(jobVertex1);
            Assert.assertEquals((Object)JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null));
            SharedStateRegistry sharedStateRegistry = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), (Collection)store.getAllCheckpoints(), restoreMode);
            EmbeddedCompletedCheckpointStore secondStore = new EmbeddedCompletedCheckpointStore(10, (Collection)store.getAllCheckpoints(), sharedStateRegistry);
            CheckpointCoordinator secondCoordinator = coordinatorBuilder.setCompletedCheckpointStore((CompletedCheckpointStore)secondStore).build(graph);
            Assert.assertTrue((boolean)secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
            cp = 0;
            for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
                for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                    for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                        for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                            VerificationMode verificationMode = cp > 0 ? Mockito.times((int)1) : Mockito.never();
                            ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)verificationMode)).registerSharedStates(sharedStateRegistry, completedCheckpoint.getCheckpointID());
                        }
                    }
                }
                ++cp;
            }
            secondStore.removeOldestCheckpoint();
            CheckpointCoordinatorTest.verifyDiscard(sharedHandlesByCheckpoint, cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? TernaryBoolean.TRUE : TernaryBoolean.FALSE);
            secondStore.removeOldestCheckpoint();
            CheckpointCoordinatorTest.verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? TernaryBoolean.FALSE : TernaryBoolean.UNDEFINED);
        }
    }

    @Test
    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
        final Tuple2 invocationCounterAndException = Tuple2.of((Object)0, null);
        IOException expectedRootCause = new IOException("Custom-Exception");
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(graph, new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                invocationCounterAndException.f0 = (Integer)invocationCounterAndException.f0 + 1;
                invocationCounterAndException.f1 = cause;
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new AssertionError((Object)"This method should not be called for the test.");
            }
        }));
        CompletableFuture savepointFuture = coordinator.triggerSynchronousSavepoint(false, "test-dir", SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        PendingCheckpoint syncSavepoint = this.declineSynchronousSavepoint(graph.getJobID(), coordinator, attemptID1, expectedRootCause);
        Assert.assertTrue((boolean)syncSavepoint.isDisposed());
        String expectedRootCauseMessage = String.format("%s: %s", expectedRootCause.getClass().getName(), expectedRootCause.getMessage());
        try {
            savepointFuture.get();
            Assert.fail((String)"Expected Exception not found.");
        }
        catch (ExecutionException e) {
            Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertTrue((boolean)(cause instanceof CheckpointException));
            Assert.assertEquals((Object)expectedRootCauseMessage, (Object)cause.getCause().getCause().getMessage());
        }
        Assert.assertEquals((long)1L, (long)((Integer)invocationCounterAndException.f0).intValue());
        Assert.assertTrue((invocationCounterAndException.f1 instanceof CheckpointException && ((Throwable)invocationCounterAndException.f1).getCause().getCause().getMessage().equals(expectedRootCauseMessage) ? 1 : 0) != 0);
        coordinator.shutdown();
    }

    @Test
    public void testTriggerCheckpointAfterStopping() throws Exception {
        StoppingCheckpointIDCounter testingCounter = new StoppingCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)testingCounter).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        testingCounter.setOwner(checkpointCoordinator);
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
    }

    @Test
    public void testTriggerCheckpointWithCounterIOException() throws Exception {
        IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
        TestFailJobCallback failureCallback = new TestFailJobCallback();
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)testingCounter).setFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failureCallback)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(statsTracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        testingCounter.setOwner(checkpointCoordinator);
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.IO_EXCEPTION);
        Assert.assertEquals((long)1L, (long)failureCallback.getInvokeCounter());
        CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
        Assert.assertEquals((long)0L, (long)counts.getNumberOfRestoredCheckpoints());
        Assert.assertEquals((long)1L, (long)counts.getTotalNumberOfCheckpoints());
        Assert.assertEquals((long)0L, (long)counts.getNumberOfInProgressCheckpoints());
        Assert.assertEquals((long)0L, (long)counts.getNumberOfCompletedCheckpoints());
        Assert.assertEquals((long)1L, (long)counts.getNumberOfFailedCheckpoints());
        Assert.assertNull((Object)statsTracker.getPendingCheckpointStats(1L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggerCheckpoint(CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason expectedFailureReason) throws Exception {
        try {
            checkpointCoordinator.startCheckpointScheduler();
            CompletableFuture onCompletionPromise = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                onCompletionPromise.get();
                Assert.fail((String)"should not trigger periodic checkpoint");
            }
            catch (ExecutionException e) {
                Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (!checkpointExceptionOptional.isPresent() || ((CheckpointException)checkpointExceptionOptional.get()).getCheckpointFailureReason() != expectedFailureReason) {
                    throw e;
                }
            }
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointScheduledInUnalignedMode() throws Exception {
        int maxConcurrentCheckpoints = 1;
        int checkpointRequestsToSend = 10;
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setUnalignedCheckpointsEnabled(true).setMaxConcurrentCheckpoints(maxConcurrentCheckpoints).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        try {
            int activeRequests;
            ArrayList<CompletableFuture> checkpointFutures = new ArrayList<CompletableFuture>(checkpointRequestsToSend);
            coordinator.startCheckpointScheduler();
            for (activeRequests = 0; activeRequests < checkpointRequestsToSend; ++activeRequests) {
                checkpointFutures.add(coordinator.triggerCheckpoint(true));
            }
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            CompletableFuture savepointFuture = coordinator.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(++activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            coordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), 1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "none");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(--activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            Assert.assertEquals((long)1L, (long)checkpointFutures.stream().filter(Future::isDone).count());
            Assert.assertFalse((boolean)savepointFuture.isDone());
            Assert.assertEquals((long)maxConcurrentCheckpoints, (long)coordinator.getNumberOfPendingCheckpoints());
            CheckpointProperties props = ((PendingCheckpoint)coordinator.getPendingCheckpoints().values().iterator().next()).getProps();
            Assert.assertTrue((boolean)props.isSavepoint());
            Assert.assertFalse((boolean)props.forceCheckpoint());
        }
        finally {
            coordinator.shutdown();
        }
    }

    @Test
    public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        final ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState subtaskState2 = OperatorSubtaskState.builder().build();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).build(graph);
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                Assert.assertTrue((String)"The coordinator checkpoint should have finished.", (boolean)coordCheckpointDone.get());
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) throws IOException {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) throws IOException {
                        return 1;
                    }
                };
            }
        });
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long checkpointId2 = (Long)checkpointIdRef.get();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((long)checkpointId2, (long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId);
        }
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)graph.getJobID(), (Object)success.getJobId());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        final ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState subtaskState2 = OperatorSubtaskState.builder().build();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage(){
            private static final long serialVersionUID = 8134582566514272546L;

            public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
                return new MemoryBackendCheckpointStorageAccess(jobId, null, null, 100){

                    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
                        return new NonPersistentMetadataCheckpointStorageLocation(1000){

                            public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
                                throw new IOException("Artificial Exception");
                            }
                        };
                    }
                };
            }
        }).build(graph);
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                Assert.assertTrue((String)"The coordinator checkpoint should have finished.", (boolean)coordCheckpointDone.get());
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) throws IOException {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) throws IOException {
                        return 1;
                    }
                };
            }
        });
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertTrue((boolean)checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
    }

    @Test
    public void testResetCalledInRegionRecovery() throws Exception {
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TestResetHook hook = new TestResetHook("id");
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)hook);
        Assert.assertFalse((boolean)hook.resetCalled);
        checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
        Assert.assertTrue((boolean)hook.resetCalled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((ignored, future) -> future.complete(new byte[0])).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId1 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId2 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId2, new CheckpointMetrics(), null);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
            Assert.assertEquals(Collections.singletonList(1L), context.getAbortedCheckpoints());
            Assert.assertEquals(Collections.singletonList(2L), context.getCompletedCheckpoints());
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((ignored, future) -> {}).build();
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        }
        finally {
            checkpointCoordinator.shutdown();
            executorService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String trigger = "Trigger";
        String abort = "Abort";
        ArrayList notificationSequence = new ArrayList();
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((id, future) -> {
            notificationSequence.add(trigger + id);
            future.complete(new byte[0]);
        }).setOnCallingAbortCurrentTriggering(() -> notificationSequence.add(abort)).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setIoExecutor((Executor)this.manuallyTriggeredScheduledExecutor).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.declineCheckpoint(1L, checkpointCoordinator, jobVertexID, graph);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Preconditions.checkState((!checkpointCoordinator.isTriggering() ? 1 : 0) != 0);
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((!notificationSequence.contains(trigger + "1") || notificationSequence.indexOf(trigger + "1") < notificationSequence.indexOf(abort) ? 1 : 0) != 0);
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex task = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        final AtomicLong reportedCheckpointId = new AtomicLong(-1L);
        TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long latestCompletedCheckpointId, long timestamp) {
                reportedCheckpointId.set(latestCompletedCheckpointId);
            }
        }).createTestingLogicalSlot();
        ExecutionGraphTestUtils.setVertexResource(task, slot);
        task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(graph);
        CompletableFuture result = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long completedCheckpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), task.getCurrentExecutionAttempt().getAttemptId(), completedCheckpointId, new CheckpointMetrics(), new TaskStateSnapshot()), "localhost");
        Assert.assertTrue((boolean)result.isDone());
        Assert.assertFalse((boolean)result.isCompletedExceptionally());
        result = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long abortedCheckpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), task.getCurrentExecutionAttempt().getAttemptId(), abortedCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED)), "localhost");
        Assert.assertTrue((boolean)result.isCompletedExceptionally());
        Assert.assertEquals((long)completedCheckpointId, (long)reportedCheckpointId.get());
    }

    @Test
    public void testBaseLocationsNotInitialized() throws Exception {
        File checkpointDir = this.tmpFolder.newFolder();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(Long.MAX_VALUE).build()).setCheckpointStorage((CheckpointStorage)new FsStateBackend(checkpointDir.toURI())).build(graph);
        Path jobCheckpointPath = new Path(checkpointDir.getAbsolutePath(), graph.getJobID().toString());
        FileSystem fs = FileSystem.get((URI)checkpointDir.toURI());
        Assert.assertFalse((boolean)fs.exists(jobCheckpointPath));
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph, CheckpointFailureManager failureManager) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setFailureManager(failureManager).build(graph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor timer) throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(timer).build(graph);
    }

    private CheckpointFailureManager getCheckpointFailureManager(final String errorMsg) {
        return new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                throw new RuntimeException(errorMsg);
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new RuntimeException(errorMsg);
            }
        });
    }

    private PendingCheckpoint declineSynchronousSavepoint(JobID jobId, CheckpointCoordinator coordinator, ExecutionAttemptID attemptID, Throwable reason) {
        long checkpointId = (Long)coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED, reason)), TASK_MANAGER_LOCATION_INFO);
        return checkpoint;
    }

    private void performIncrementalCheckpoint(JobID jobId, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex jobVertex1, List<KeyGroupRange> keyGroupPartitions1, int cpSequenceNumber) throws Exception {
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateState = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            privateState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("private-1", new byte[]{112}), (String)"private-1"));
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedState = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            if (cpSequenceNumber > 0) {
                sharedState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("shared-" + (cpSequenceNumber - 1) + "-" + keyGroupRange, new byte[]{115}), (String)("shared-" + (cpSequenceNumber - 1))));
            }
            sharedState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{115}), (String)("shared-" + cpSequenceNumber)));
            IncrementalRemoteKeyedStateHandle managedState = (IncrementalRemoteKeyedStateHandle)Mockito.spy((Object)new IncrementalRemoteKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, checkpointId, sharedState, privateState, (StreamStateHandle)Mockito.spy((Object)new ByteStreamStateHandle("meta", new byte[]{109}))));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState)Mockito.spy((Object)OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)managedState).build());
            HashMap<OperatorID, OperatorSubtaskState> opStates = new HashMap<OperatorID, OperatorSubtaskState>();
            opStates.put(((OperatorIDPair)jobVertex1.getOperatorIDs().get(0)).getGeneratedOperatorID(), operatorSubtaskState);
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobId, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static void verifyDiscard(List<List<IncrementalKeyedStateHandle.HandleAndLocalPath>> sharedHandles, Function<Integer, TernaryBoolean> checkpointVerify) {
        for (List<IncrementalKeyedStateHandle.HandleAndLocalPath> cpList : sharedHandles) {
            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : cpList) {
                String key = handleAndLocalPath.getLocalPath();
                int checkpointID = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
                DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), checkpointVerify.apply(checkpointID));
            }
        }
    }

    private TestingStreamStateHandle handle() {
        return new TestingStreamStateHandle();
    }

    private void declineCheckpoint(long checkpointId, CheckpointCoordinator coordinator, JobVertexID nackVertexID, ExecutionGraph graph) {
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), graph.getJobVertex(nackVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
    }

    private void ackCheckpoint(long checkpointId, CheckpointCoordinator coordinator, JobVertexID ackVertexID, ExecutionGraph graph, TestingStreamStateHandle metaState, TestingStreamStateHandle privateState, TestingStreamStateHandle sharedState) throws CheckpointException {
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedStateList = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)sharedState, (String)"shared-state-key")));
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateStateList = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)privateState, (String)"private-state-key")));
        ExecutionJobVertex jobVertex = graph.getJobVertex(ackVertexID);
        OperatorID operatorID = ((OperatorIDPair)jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID();
        coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), new TaskStateSnapshot(Collections.singletonMap(operatorID, OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)new IncrementalRemoteKeyedStateHandle(UUID.randomUUID(), KeyGroupRange.of((int)0, (int)9), checkpointId, sharedStateList, privateStateList, (StreamStateHandle)metaState)).build()))), "test");
    }

    private static class TestResetHook
    implements MasterTriggerRestoreHook<String> {
        private final String id;
        boolean resetCalled;

        TestResetHook(String id) {
            this.id = id;
            this.resetCalled = false;
        }

        public String getIdentifier() {
            return this.id;
        }

        public void reset() throws Exception {
            this.resetCalled = true;
        }

        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
            throw new UnsupportedOperationException();
        }

        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            throw new UnsupportedOperationException();
        }
    }

    private static class TestFailJobCallback
    implements CheckpointFailureManager.FailJobCallback {
        private int invokeCounter = 0;

        private TestFailJobCallback() {
        }

        public void failJob(Throwable cause) {
            ++this.invokeCounter;
        }

        public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID executionAttemptID) {
            ++this.invokeCounter;
        }

        public int getInvokeCounter() {
            return this.invokeCounter;
        }
    }

    private static class CheckpointIDCounterWithOwner
    extends StandaloneCheckpointIDCounter {
        protected CheckpointCoordinator owner;

        private CheckpointIDCounterWithOwner() {
        }

        void setOwner(CheckpointCoordinator coordinator) {
            this.owner = (CheckpointCoordinator)Preconditions.checkNotNull((Object)coordinator);
        }
    }

    private static class StoppingCheckpointIDCounter
    extends CheckpointIDCounterWithOwner {
        private StoppingCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull((Object)this.owner);
            this.owner.stopCheckpointScheduler();
            return super.getAndIncrement();
        }
    }

    private static class IOExceptionCheckpointStorage
    extends JobManagerCheckpointStorage {
        private IOExceptionCheckpointStorage() {
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return new MemoryBackendCheckpointStorageAccess(jobId, null, null, 100){

                public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
                    throw new IOException("disk is error!");
                }
            };
        }
    }

    private static class IOExceptionCheckpointIDCounter
    extends CheckpointIDCounterWithOwner {
        private IOExceptionCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull((Object)this.owner);
            throw new IOException("disk is error!");
        }
    }
}

