package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.class */
public class CheckpointCoordinatorTest extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode = new int[OperatorStateHandle.Mode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.UNION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.BROADCAST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$TestingCheckpointIDCounter.class */
    private static class TestingCheckpointIDCounter extends StandaloneCheckpointIDCounter {
        private CheckpointCoordinator owner;

        private TestingCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull(this.owner);
            this.owner.stopCheckpointScheduler();
            return super.getAndIncrement();
        }

        void setOwner(CheckpointCoordinator checkpointCoordinator) {
            this.owner = (CheckpointCoordinator) Preconditions.checkNotNull(checkpointCoordinator);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID), CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2), getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!"));
        try {
            try {
                CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
                Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
                long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
                PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
                checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
                Assert.assertFalse(pendingCheckpoint.isDiscarded());
                Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
                checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue), TASK_MANAGER_LOCATION_INFO);
                Assert.fail("Test failed.");
                try {
                    checkpointCoordinator.shutdown(JobStatus.FINISHED);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            } catch (Exception e2) {
                Assert.assertTrue(e2 instanceof RuntimeException);
                Assert.assertEquals("Exceeded checkpoint failure tolerance number!", e2.getMessage());
                try {
                    checkpointCoordinator.shutdown(JobStatus.FINISHED);
                } catch (Exception e3) {
                    e3.printStackTrace();
                    Assert.fail(e3.getMessage());
                }
            }
        } catch (Throwable th) {
            try {
                checkpointCoordinator.shutdown(JobStatus.FINISHED);
            } catch (Exception e4) {
                e4.printStackTrace();
                Assert.fail(e4.getMessage());
            }
            throw th;
        }
    }

    @Test
    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new JobID(), CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID()), CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID()), getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!"));
        try {
            try {
                checkpointCoordinator.triggerCheckpoint(false);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
                checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
                Assert.fail("Test failed.");
                checkpointCoordinator.shutdown(JobStatus.FINISHED);
            } catch (Exception e) {
                Assert.assertTrue(e instanceof RuntimeException);
                Assert.assertEquals("Exceeded checkpoint failure tolerance number!", e.getMessage());
                checkpointCoordinator.shutdown(JobStatus.FINISHED);
            }
        } catch (Throwable th) {
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
            throw th;
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointSimple() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, mockExecutionVertex, mockExecutionVertex2);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt())).triggerCheckpoint(longValue, pendingCheckpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt())).triggerCheckpoint(longValue, pendingCheckpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, mockExecutionVertex, mockExecutionVertex2);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            CompletableFuture triggerCheckpoint2 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint2.isCompletedExceptionally());
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(2L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
            long longValue = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
            long longValue2 = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue2));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            Assert.assertNotNull(pendingCheckpoint2);
            Assert.assertEquals(longValue2, pendingCheckpoint2.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint2.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint2.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint2.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint2.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint2.isDiscarded());
            Assert.assertFalse(pendingCheckpoint2.areTasksFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue), TASK_MANAGER_LOCATION_INFO);
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointAborted(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointAborted(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long longValue3 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint3 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue3));
            Assert.assertEquals(longValue2, longValue3);
            Assert.assertNotNull(pendingCheckpoint3);
            Assert.assertEquals(longValue3, pendingCheckpoint3.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint3.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint3.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint3.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint3.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint3.isDiscarded());
            Assert.assertFalse(pendingCheckpoint3.areTasksFullyAcknowledged());
            Assert.assertNotEquals(pendingCheckpoint.getCheckpointId(), pendingCheckpoint3.getCheckpointId());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointAborted(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointAborted(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, mockExecutionVertex, mockExecutionVertex2);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
            OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
            TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(fromJobVertexID)).thenReturn(operatorSubtaskState);
            Mockito.when(taskStateSnapshot2.getSubtaskStateByOperatorID(fromJobVertexID2)).thenReturn(operatorSubtaskState2);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue, new CheckpointMetrics(), taskStateSnapshot2);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot2, Mockito.never())).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.never())).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
            Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
            Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue());
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex6}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId), TASK_MANAGER_LOCATION_INFO);
            CompletableFuture triggerCheckpoint2 = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint2.isCompletedExceptionally());
            Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = build.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, build.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId), ((Long) Matchers.any(Long.class)).longValue());
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(2L, build.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue());
            List successfulCheckpoints = build.getSuccessfulCheckpoints();
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) successfulCheckpoints.get(0);
            Assert.assertEquals(checkpointId, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertTrue(completedCheckpoint.getOperatorStates().isEmpty());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) successfulCheckpoints.get(1);
            Assert.assertEquals(checkpointId2, completedCheckpoint2.getCheckpointID());
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex6}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(10)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex3.getJobvertexId());
            OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex4.getJobvertexId());
            OperatorID fromJobVertexID3 = OperatorID.fromJobVertexID(mockExecutionVertex5.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState3 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
            taskStateSnapshot2.putSubtaskStateByOperatorID(fromJobVertexID2, operatorSubtaskState2);
            taskStateSnapshot3.putSubtaskStateByOperatorID(fromJobVertexID3, operatorSubtaskState3);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
            CompletableFuture triggerCheckpoint2 = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint2.isCompletedExceptionally());
            Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = build.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState4 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState5 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState6 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot4.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState4);
            taskStateSnapshot5.putSubtaskStateByOperatorID(fromJobVertexID2, operatorSubtaskState5);
            taskStateSnapshot6.putSubtaskStateByOperatorID(fromJobVertexID3, operatorSubtaskState6);
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2, new CheckpointMetrics(), taskStateSnapshot6), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2, new CheckpointMetrics(), taskStateSnapshot4), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2, new CheckpointMetrics(), taskStateSnapshot5), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, build.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.never())).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.never())).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.never())).discardState();
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) build.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(checkpointId2, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertEquals(3L, completedCheckpoint.getOperatorStates().size());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId2), ((Long) Matchers.any(Long.class)).longValue());
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState3, Mockito.times(1))).discardState();
            build.shutdown(JobStatus.FINISHED);
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.times(1))).discardState();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID4);
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex2, mockExecutionVertex3}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex4}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
            this.manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assert.assertTrue("Checkpoint was not canceled by the timeout", pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, build.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
            ((Execution) Mockito.verify(mockExecutionVertex4.getCurrentExecutionAttempt(), Mockito.times(0))).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setTasksToTrigger(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID)}).setTasksToWaitFor(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2), CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3)}).setTasksToCommitTo(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID4)}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            long longValue = ((Long) build.getPendingCheckpoints().keySet().iterator().next()).longValue();
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), executionAttemptID2, longValue), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L), TASK_MANAGER_LOCATION_INFO);
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), longValue), TASK_MANAGER_LOCATION_INFO);
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
        ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2, CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3)}).setTasksToCommitTo(new ExecutionVertex[0]).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot2, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, new CheckpointMetrics(), taskStateSnapshot4), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot4, Mockito.never())).discardState();
        Mockito.reset(new OperatorSubtaskState[]{operatorSubtaskState});
        build.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId, new CheckpointMetrics(), taskStateSnapshot5), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot5, Mockito.times(1))).discardState();
        Mockito.reset(new TaskStateSnapshot[]{taskStateSnapshot3});
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot6), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot6, Mockito.times(1))).discardState();
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, mockExecutionVertex, mockExecutionVertex2);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture triggerSavepoint = checkpointCoordinator.triggerSavepoint(absolutePath);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(pendingCheckpoint.canBeSubsumed());
        OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
        OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(fromJobVertexID)).thenReturn(operatorSubtaskState);
        Mockito.when(taskStateSnapshot2.getSubtaskStateByOperatorID(fromJobVertexID2)).thenReturn(operatorSubtaskState2);
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue, new CheckpointMetrics(), taskStateSnapshot2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        Assert.assertNotNull(triggerSavepoint.get());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue), ((Long) Matchers.any(Long.class)).longValue());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(jobID, completedCheckpoint.getJobId());
        Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
        Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
        CompletableFuture triggerSavepoint2 = checkpointCoordinator.triggerSavepoint(absolutePath);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerSavepoint2.isDone());
        long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
        Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
        Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
        Assert.assertNotNull(triggerSavepoint2.get());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.never())).discardState();
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue());
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), ((Long) Matchers.any(Long.class)).longValue());
        checkpointCoordinator.shutdown(JobStatus.FINISHED);
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasks(new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(10)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture triggerSavepoint = build.triggerSavepoint(absolutePath);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
        CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
        CompletableFuture triggerCheckpoint2 = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint2.isCompletedExceptionally());
        long last2 = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(3L, build.getNumberOfPendingCheckpoints());
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last2), TASK_MANAGER_LOCATION_INFO);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, build.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) build.getPendingCheckpoints().get(Long.valueOf(last))).isDiscarded());
        Assert.assertFalse(triggerSavepoint.isDone());
        CompletableFuture triggerCheckpoint3 = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint3.isCompletedExceptionally());
        Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
        CompletableFuture triggerSavepoint2 = build.triggerSavepoint(absolutePath);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last3 = standaloneCheckpointIDCounter.getLast();
        Assert.assertFalse(triggerSavepoint2.isCompletedExceptionally());
        Assert.assertEquals(3L, build.getNumberOfPendingCheckpoints());
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last3), TASK_MANAGER_LOCATION_INFO);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last3), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, build.getNumberOfPendingCheckpoints());
        Assert.assertEquals(2L, build.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) build.getPendingCheckpoints().get(Long.valueOf(last))).isDiscarded());
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertNotNull(triggerSavepoint2.get());
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last), TASK_MANAGER_LOCATION_INFO);
        build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
        Assert.assertEquals(3L, build.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertNotNull(triggerSavepoint.get());
    }

    private void testMaxConcurrentAttempts(int i) {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            AtomicInteger atomicInteger = new AtomicInteger();
            Execution currentExecutionAttempt = mockExecutionVertex.getCurrentExecutionAttempt();
            ((Execution) Mockito.doAnswer(invocationOnMock -> {
                atomicInteger.incrementAndGet();
                return null;
            }).when(currentExecutionAttempt)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.doAnswer(invocationOnMock2 -> {
                atomicInteger.incrementAndGet();
                return null;
            }).when(currentExecutionAttempt)).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(i).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex2}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex3}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            build.startCheckpointScheduler();
            for (int i2 = 0; i2 < i; i2++) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Assert.assertEquals(i, atomicInteger.get());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(i))).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L), TASK_MANAGER_LOCATION_INFO);
            Collection<ScheduledFuture<?>> periodicScheduledTask = this.manuallyTriggeredScheduledExecutor.getPeriodicScheduledTask();
            Assert.assertEquals(1L, periodicScheduledTask.size());
            periodicScheduledTask.iterator().next();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(i + 1, atomicInteger.get());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(i + 1, atomicInteger.get());
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempsWithSubsumption() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build()).setTasksToTrigger(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID)}).setTasksToWaitFor(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2)}).setTasksToCommitTo(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID())}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            build.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (build.getNumberOfPendingCheckpoints() < 2);
            Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(build.getPendingCheckpoints().get(1L));
            Assert.assertNotNull(build.getPendingCheckpoints().get(2L));
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 2L), TASK_MANAGER_LOCATION_INFO);
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (build.getNumberOfPendingCheckpoints() < 2);
            Assert.assertEquals(2L, build.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(build.getPendingCheckpoints().get(3L));
            Assert.assertNotNull(build.getPendingCheckpoints().get(4L));
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            AtomicReference atomicReference = new AtomicReference(ExecutionState.CREATED);
            Mockito.when(mockExecutionVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocationOnMock -> {
                return (ExecutionState) atomicReference.get();
            });
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex2}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex3}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            build.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(0L, build.getNumberOfPendingCheckpoints());
            atomicReference.set(ExecutionState.RUNNING);
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(build.getNumberOfPendingCheckpoints() > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build()).setTasks(new ExecutionVertex[]{mockExecutionVertex}).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        ArrayList arrayList = new ArrayList();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < 5; i++) {
            arrayList.add(build.triggerSavepoint(absolutePath));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((CompletableFuture) it.next()).isDone());
        }
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last = standaloneCheckpointIDCounter.getLast();
        int i2 = 0;
        while (i2 < 5) {
            build.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last), TASK_MANAGER_LOCATION_INFO);
            i2++;
            last--;
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull(((CompletableFuture) it2.next()).get());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMinPauseBetweenCheckpoints(100000000L).setMaxConcurrentCheckpoints(1).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        Assert.assertFalse("Did not trigger savepoint", build.triggerSavepoint(absolutePath).isDone());
        Assert.assertFalse("Did not trigger savepoint", build.triggerSavepoint(absolutePath).isDone());
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        try {
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            Iterator it = build.getPendingCheckpoints().values().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), ((PendingCheckpoint) it.next()).getProps());
            }
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        testCreateKeyGroupPartitions(1, 1);
        testCreateKeyGroupPartitions(13, 1);
        testCreateKeyGroupPartitions(13, 2);
        testCreateKeyGroupPartitions(32767, 1);
        testCreateKeyGroupPartitions(32767, 13);
        testCreateKeyGroupPartitions(32767, 32767);
        Random random = new Random(1234L);
        for (int i = 0; i < 1000; i++) {
            int nextInt = 1 + random.nextInt(32766);
            testCreateKeyGroupPartitions(nextInt, 1 + random.nextInt(nextInt));
        }
    }

    private void testCreateKeyGroupPartitions(int i, int i2) {
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(i, i2);
        for (int i3 = 0; i3 < i; i3++) {
            KeyGroupRange keyGroupRange = (KeyGroupRange) createKeyGroupPartitions.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(i, i2, i3));
            if (!keyGroupRange.contains(i3)) {
                Assert.fail("Could not find expected key-group " + i3 + " in range " + keyGroupRange);
            }
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random random = new Random(42L);
        for (int i = 0; i < 10000; i++) {
            doTestPartitionableStateRepartitioning(random, 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9));
        }
    }

    private void doTestPartitionableStateRepartitioning(Random random, int i, int i2, int i3, int i4) {
        int i5;
        ArrayList<List> arrayList = new ArrayList(i);
        for (int i6 = 0; i6 < i; i6++) {
            Path path = new Path("/fake-" + i6);
            HashMap hashMap = new HashMap();
            int i7 = 0;
            for (int i8 = 0; i8 < i3 - 1; i8++) {
                long[] jArr = new long[1 + random.nextInt(i4)];
                for (int i9 = 0; i9 < jArr.length; i9++) {
                    jArr[i9] = i7;
                    i7++;
                }
                hashMap.put("State-" + i8, new OperatorStateHandle.StateMetaInfo(jArr, random.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            }
            if (i3 % 2 == 0) {
                hashMap.put("State-" + (i3 - 1), new OperatorStateHandle.StateMetaInfo(new long[]{i7 + 1, i7 + 2, i7 + 3, i7 + 4}, OperatorStateHandle.Mode.BROADCAST));
            }
            arrayList.add(Collections.singletonList(new OperatorStreamStateHandle(hashMap, new FileStateHandle(path, -1L))));
        }
        HashMap hashMap2 = new HashMap();
        int i10 = 0;
        int i11 = 0;
        for (List<OperatorStateHandle> list : arrayList) {
            Assert.assertEquals(1L, list.size());
            for (OperatorStateHandle operatorStateHandle : list) {
                Map stateNameToPartitionOffsets = operatorStateHandle.getStateNameToPartitionOffsets();
                HashMap hashMap3 = new HashMap(stateNameToPartitionOffsets.size());
                for (Map.Entry entry : stateNameToPartitionOffsets.entrySet()) {
                    long[] offsets = ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets();
                    switch (AnonymousClass3.$SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode().ordinal()]) {
                        case 1:
                            i5 = i2;
                            break;
                        case 2:
                            i5 = (i2 / i) + (i10 < i2 % i ? 1 : 0);
                            break;
                        case 3:
                            i5 = 1;
                            break;
                        default:
                            throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode());
                    }
                    if (i5 > 0) {
                        i11 += i5 * offsets.length;
                        ArrayList arrayList2 = new ArrayList(offsets.length);
                        for (long j : offsets) {
                            for (int i12 = 0; i12 < i5; i12++) {
                                arrayList2.add(Long.valueOf(j));
                            }
                        }
                        hashMap3.put(entry.getKey(), arrayList2);
                    }
                }
                if (!hashMap3.isEmpty()) {
                    hashMap2.put(operatorStateHandle.getDelegateStateHandle(), hashMap3);
                }
                i10++;
            }
        }
        List repartitionState = RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(arrayList, i, i2);
        HashMap hashMap4 = new HashMap();
        int i13 = Integer.MAX_VALUE;
        int i14 = 0;
        int i15 = 0;
        for (int i16 = 0; i16 < i2; i16++) {
            int i17 = 0;
            for (OperatorStateHandle operatorStateHandle2 : (Collection) repartitionState.get(i16)) {
                for (Map.Entry entry2 : operatorStateHandle2.getStateNameToPartitionOffsets().entrySet()) {
                    Map map = (Map) hashMap4.get(operatorStateHandle2.getDelegateStateHandle());
                    if (map == null) {
                        map = new HashMap();
                        hashMap4.put(operatorStateHandle2.getDelegateStateHandle(), map);
                    }
                    List list2 = (List) map.get(entry2.getKey());
                    if (list2 == null) {
                        list2 = new ArrayList();
                        map.put(entry2.getKey(), list2);
                    }
                    for (long j2 : ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets()) {
                        list2.add(Long.valueOf(j2));
                    }
                    i17 += ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets().length;
                }
            }
            i13 = Math.min(i13, i17);
            i14 = Math.max(i14, i17);
            i15 += i17;
        }
        Iterator it = hashMap4.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) it.next()).values().iterator();
            while (it2.hasNext()) {
                Collections.sort((List) it2.next());
            }
        }
        if (i != i2) {
            int i18 = i14 - i13;
            Assert.assertTrue("Difference in partition load is > 1 : " + i18, i18 <= 1);
        }
        Assert.assertEquals(i11, i15);
        Assert.assertEquals(hashMap2, hashMap4);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build();
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        build.setCheckpointStatsTracker(checkpointStatsTracker);
        Mockito.when(checkpointStatsTracker.reportPendingCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointProperties) Matchers.any(CheckpointProperties.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportPendingCheckpoint(Mockito.eq(1L), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointProperties) Mockito.eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        CompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(standaloneCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        standaloneCompletedCheckpointStore.addCheckpoint(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation()));
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        build.setCheckpointStatsTracker(checkpointStatsTracker);
        Assert.assertTrue(build.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportRestoredCheckpoint((RestoredCheckpointStats) Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex mockExecutionJobVertex = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(new JobVertexID(), 2, 4);
        ArrayList arrayList = new ArrayList(2);
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex.getTaskVertices()));
        ExecutionVertex[] executionVertexArr = (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
        RecoverableCompletedCheckpointStore recoverableCompletedCheckpointStore = new RecoverableCompletedCheckpointStore(10);
        ArrayList arrayList2 = new ArrayList(2);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setTasks(executionVertexArr).setCompletedCheckpointStore(recoverableCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).setSharedStateRegistryFactory(executor -> {
            SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(executor);
            arrayList2.add(sharedStateRegistry);
            return sharedStateRegistry;
        }).build();
        List<KeyGroupRange> createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(4, 2);
        for (int i = 0; i < 3; i++) {
            performIncrementalCheckpoint(jobID, build, mockExecutionJobVertex, createKeyGroupPartitions, i);
        }
        List successfulCheckpoints = build.getSuccessfulCheckpoints();
        Assert.assertEquals(3L, successfulCheckpoints.size());
        int i2 = 0;
        ArrayList arrayList3 = new ArrayList(3);
        for (int i3 = 0; i3 < 3; i3++) {
            arrayList3.add(new HashMap(2));
        }
        int i4 = 0;
        Iterator it = successfulCheckpoints.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((CompletedCheckpoint) it.next()).getOperatorStates().values().iterator();
            while (it2.hasNext()) {
                for (OperatorSubtaskState operatorSubtaskState : ((OperatorState) it2.next()).getStates()) {
                    Iterator it3 = operatorSubtaskState.getManagedKeyedState().iterator();
                    while (it3.hasNext()) {
                        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = (KeyedStateHandle) it3.next();
                        ((KeyedStateHandle) Mockito.verify(incrementalRemoteKeyedStateHandle, Mockito.times(1))).registerSharedStates((SharedStateRegistry) arrayList2.get(0));
                        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle2 = incrementalRemoteKeyedStateHandle;
                        ((Map) arrayList3.get(i4)).putAll(incrementalRemoteKeyedStateHandle2.getSharedState());
                        for (StreamStateHandle streamStateHandle : incrementalRemoteKeyedStateHandle2.getSharedState().values()) {
                            Assert.assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle));
                            ((StreamStateHandle) Mockito.verify(streamStateHandle, Mockito.never())).discardState();
                            i2++;
                        }
                        Iterator it4 = incrementalRemoteKeyedStateHandle2.getPrivateState().values().iterator();
                        while (it4.hasNext()) {
                            ((StreamStateHandle) Mockito.verify((StreamStateHandle) it4.next(), Mockito.never())).discardState();
                        }
                        ((StreamStateHandle) Mockito.verify(incrementalRemoteKeyedStateHandle2.getMetaStateHandle(), Mockito.never())).discardState();
                    }
                    ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
                }
            }
            i4++;
        }
        Assert.assertEquals(10L, i2);
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it5 = arrayList3.iterator();
        while (it5.hasNext()) {
            Iterator it6 = ((Map) it5.next()).values().iterator();
            while (it6.hasNext()) {
                ((StreamStateHandle) Mockito.verify((StreamStateHandle) it6.next(), Mockito.never())).discardState();
            }
        }
        recoverableCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED);
        HashSet hashSet = new HashSet();
        hashSet.add(mockExecutionJobVertex);
        Assert.assertTrue(build.restoreLatestCheckpointedStateToAll(hashSet, false));
        int i5 = 0;
        Iterator it7 = successfulCheckpoints.iterator();
        while (it7.hasNext()) {
            Iterator it8 = ((CompletedCheckpoint) it7.next()).getOperatorStates().values().iterator();
            while (it8.hasNext()) {
                Iterator it9 = ((OperatorState) it8.next()).getStates().iterator();
                while (it9.hasNext()) {
                    Iterator it10 = ((OperatorSubtaskState) it9.next()).getManagedKeyedState().iterator();
                    while (it10.hasNext()) {
                        ((KeyedStateHandle) Mockito.verify((KeyedStateHandle) it10.next(), i5 > 0 ? Mockito.times(1) : Mockito.never())).registerSharedStates((SharedStateRegistry) arrayList2.get(1));
                    }
                }
            }
            i5++;
        }
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it11 = arrayList3.iterator();
        while (it11.hasNext()) {
            for (Map.Entry entry : ((Map) it11.next()).entrySet()) {
                String keyString = ((StateHandleID) entry.getKey()).getKeyString();
                if (Integer.parseInt(String.valueOf(keyString.charAt(keyString.length() - 1))) == 0) {
                    ((StreamStateHandle) Mockito.verify(entry.getValue(), Mockito.times(1))).discardState();
                } else {
                    ((StreamStateHandle) Mockito.verify(entry.getValue(), Mockito.never())).discardState();
                }
            }
        }
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it12 = arrayList3.iterator();
        while (it12.hasNext()) {
            Iterator it13 = ((Map) it12.next()).values().iterator();
            while (it13.hasNext()) {
                ((StreamStateHandle) Mockito.verify((StreamStateHandle) it13.next(), Mockito.times(1))).discardState();
            }
        }
    }

    @Test
    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
        final Tuple2 of = Tuple2.of(0, (Object) null);
        IOException iOException = new IOException("Custom-Exception");
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobID, CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID), CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID()), new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.1
            public void failJob(Throwable th) {
                of.f0 = Integer.valueOf(((Integer) of.f0).intValue() + 1);
                of.f1 = th;
            }

            public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID2) {
                throw new AssertionError("This method should not be called for the test.");
            }
        }));
        CompletableFuture triggerSynchronousSavepoint = checkpointCoordinator.triggerSynchronousSavepoint(false, "test-dir");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(declineSynchronousSavepoint(jobID, checkpointCoordinator, executionAttemptID, iOException).isDiscarded());
        try {
            triggerSynchronousSavepoint.get();
            Assert.fail("Expected Exception not found.");
        } catch (ExecutionException e) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(e);
            Assert.assertTrue(stripExecutionException instanceof CheckpointException);
            Assert.assertEquals(iOException.getMessage(), stripExecutionException.getCause().getMessage());
        }
        Assert.assertEquals(1L, ((Integer) of.f0).intValue());
        Assert.assertTrue((of.f1 instanceof CheckpointException) && ((Throwable) of.f1).getCause().getMessage().equals(iOException.getMessage()));
        checkpointCoordinator.shutdown(JobStatus.FAILING);
    }

    @Test
    public void testTriggerCheckpointAfterCancel() throws Exception {
        CheckpointIDCounter testingCheckpointIDCounter = new TestingCheckpointIDCounter();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(testingCheckpointIDCounter).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        testingCheckpointIDCounter.setOwner(build);
        try {
            build.startCheckpointScheduler();
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true, false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                triggerCheckpoint.get();
                Assert.fail("should not trigger periodic checkpoint after stop the coordinator.");
            } catch (ExecutionException e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                Assert.assertTrue(findThrowable.isPresent());
                Assert.assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
            }
        } finally {
            build.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testSavepointScheduledInUnalignedMode() throws Exception {
        int i = 0;
        JobID jobID = new JobID();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setUnalignedCheckpointsEnabled(true).setMaxConcurrentCheckpoints(1).build()).setJobId(jobID).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        try {
            ArrayList arrayList = new ArrayList(10);
            build.startCheckpointScheduler();
            while (i < 10) {
                arrayList.add(build.triggerCheckpoint(true));
                i++;
            }
            Assert.assertEquals(i - 1, build.getNumQueuedRequests());
            CompletableFuture triggerSavepoint = build.triggerSavepoint("/tmp");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            int i2 = i + 1;
            Assert.assertEquals(i2 - 1, build.getNumQueuedRequests());
            build.receiveDeclineMessage(new DeclineCheckpoint(jobID, new ExecutionAttemptID(), 1L), "none");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((i2 - 1) - 1, build.getNumQueuedRequests());
            Assert.assertEquals(1L, arrayList.stream().filter((v0) -> {
                return v0.isDone();
            }).count());
            Assert.assertFalse(triggerSavepoint.isDone());
            Assert.assertEquals(1, build.getNumberOfPendingCheckpoints());
            CheckpointProperties props = ((PendingCheckpoint) build.getPendingCheckpoints().values().iterator().next()).getProps();
            Assert.assertTrue(props.isSavepoint());
            Assert.assertFalse(props.forceCheckpoint());
            build.shutdown(JobStatus.FINISHED);
        } catch (Throwable th) {
            build.shutdown(JobStatus.FINISHED);
            throw th;
        }
    }

    private CheckpointCoordinator getCheckpointCoordinator(JobID jobID, ExecutionVertex executionVertex, ExecutionVertex executionVertex2) {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setTasks(new ExecutionVertex[]{executionVertex, executionVertex2}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointCoordinator getCheckpointCoordinator(JobID jobID, ExecutionVertex executionVertex, ExecutionVertex executionVertex2, CheckpointFailureManager checkpointFailureManager) {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setTasks(new ExecutionVertex[]{executionVertex, executionVertex2}).setTimer(this.manuallyTriggeredScheduledExecutor).setFailureManager(checkpointFailureManager).build();
    }

    private CheckpointCoordinator getCheckpointCoordinator() {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2, jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID)), 1, 1, ExecutionState.FINISHED, new ExecutionState[0]);
        ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}).setTasksToWaitFor(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3), CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID4)}).setTasksToCommitTo(new ExecutionVertex[0]).setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointFailureManager getCheckpointFailureManager(final String str) {
        return new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.2
            public void failJob(Throwable th) {
                throw new RuntimeException(str);
            }

            public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID) {
                throw new RuntimeException(str);
            }
        });
    }

    private PendingCheckpoint declineSynchronousSavepoint(JobID jobID, CheckpointCoordinator checkpointCoordinator, ExecutionAttemptID executionAttemptID, Throwable th) {
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue, th), TASK_MANAGER_LOCATION_INFO);
        return pendingCheckpoint;
    }

    private void performIncrementalCheckpoint(JobID jobID, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> list, int i) throws Exception {
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, checkpointCoordinator.getPendingCheckpoints().size());
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        for (int i2 = 0; i2 < executionJobVertex.getParallelism(); i2++) {
            KeyGroupRange keyGroupRange = list.get(i2);
            HashMap hashMap = new HashMap();
            hashMap.put(new StateHandleID("private-1"), Mockito.spy(new ByteStreamStateHandle("private-1", new byte[]{112})));
            HashMap hashMap2 = new HashMap();
            if (i > 0) {
                hashMap2.put(new StateHandleID("shared-" + (i - 1)), Mockito.spy(new PlaceholderStreamStateHandle()));
            }
            hashMap2.put(new StateHandleID("shared-" + i), Mockito.spy(new ByteStreamStateHandle("shared-" + i + "-" + keyGroupRange, new byte[]{115})));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((IncrementalRemoteKeyedStateHandle) Mockito.spy(new IncrementalRemoteKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, longValue, hashMap2, hashMap, (StreamStateHandle) Mockito.spy(new ByteStreamStateHandle("meta", new byte[]{109}))))), StateObjectCollection.empty()));
            HashMap hashMap3 = new HashMap();
            hashMap3.put(((OperatorIDPair) executionJobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID(), operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionJobVertex.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), new TaskStateSnapshot(hashMap3)), TASK_MANAGER_LOCATION_INFO);
        }
    }
}
