package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.class */
public class CheckpointCoordinatorTest extends TestLogger {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode = new int[OperatorStateHandle.Mode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.UNION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.BROADCAST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            JobVertexID jobVertexID = new JobVertexID();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex(executionAttemptID2, jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID)), 1, 1, ExecutionState.FINISHED, new ExecutionState[0])}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointSimple() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumScheduledTasks());
            long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(currentTimeMillis, pendingCheckpoint.getCheckpointTimestamp());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt())).triggerCheckpoint(longValue, currentTimeMillis, CheckpointOptions.forCheckpointWithDefaultLocation());
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt())).triggerCheckpoint(longValue, currentTimeMillis, CheckpointOptions.forCheckpointWithDefaultLocation());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, checkpointCoordinator.getNumScheduledTasks());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue));
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumScheduledTasks());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis + 2, false));
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(2L, checkpointCoordinator.getNumScheduledTasks());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
            long longValue = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
            long longValue2 = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue2));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(currentTimeMillis, pendingCheckpoint.getCheckpointTimestamp());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            Assert.assertNotNull(pendingCheckpoint2);
            Assert.assertEquals(longValue2, pendingCheckpoint2.getCheckpointId());
            Assert.assertEquals(currentTimeMillis + 2, pendingCheckpoint2.getCheckpointTimestamp());
            Assert.assertEquals(jobID, pendingCheckpoint2.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint2.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint2.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint2.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint2.isDiscarded());
            Assert.assertFalse(pendingCheckpoint2.isFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(currentTimeMillis + 2), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(currentTimeMillis + 2), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumScheduledTasks());
            long longValue3 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint3 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue3));
            Assert.assertEquals(longValue2, longValue3);
            Assert.assertNotNull(pendingCheckpoint3);
            Assert.assertEquals(longValue3, pendingCheckpoint3.getCheckpointId());
            Assert.assertEquals(jobID, pendingCheckpoint3.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint3.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint3.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint3.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint3.isDiscarded());
            Assert.assertFalse(pendingCheckpoint3.isFullyAcknowledged());
            Assert.assertNotEquals(pendingCheckpoint.getCheckpointId(), pendingCheckpoint3.getCheckpointId());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue));
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumScheduledTasks());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumScheduledTasks());
            long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(currentTimeMillis, pendingCheckpoint.getCheckpointTimestamp());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
            OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
            TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(fromJobVertexID)).thenReturn(operatorSubtaskState);
            Mockito.when(taskStateSnapshot2.getSubtaskStateByOperatorID(fromJobVertexID2)).thenReturn(operatorSubtaskState2);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue, new CheckpointMetrics(), taskStateSnapshot2);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot2, Mockito.never())).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.never())).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue, new CheckpointMetrics(), taskStateSnapshot));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumScheduledTasks());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertEquals(currentTimeMillis, completedCheckpoint.getTimestamp());
            Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
            Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
            long j = currentTimeMillis + 7;
            checkpointCoordinator.triggerCheckpoint(j, false);
            long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumScheduledTasks());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertEquals(j, completedCheckpoint2.getTimestamp());
            Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
            Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), Mockito.eq(j));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), Mockito.eq(j));
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 8617;
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}, new ExecutionVertex[]{mockExecutionVertex6}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId));
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(j, false));
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId), Mockito.eq(currentTimeMillis));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId2), Mockito.eq(j));
            List successfulCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) successfulCheckpoints.get(0);
            Assert.assertEquals(checkpointId, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(currentTimeMillis, completedCheckpoint.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertTrue(completedCheckpoint.getOperatorStates().isEmpty());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) successfulCheckpoints.get(1);
            Assert.assertEquals(checkpointId2, completedCheckpoint2.getCheckpointID());
            Assert.assertEquals(j, completedCheckpoint2.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 1552;
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}, new ExecutionVertex[]{mockExecutionVertex6}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId), Mockito.eq(currentTimeMillis), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex3.getJobvertexId());
            OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex4.getJobvertexId());
            OperatorID fromJobVertexID3 = OperatorID.fromJobVertexID(mockExecutionVertex5.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState3 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
            taskStateSnapshot2.putSubtaskStateByOperatorID(fromJobVertexID2, operatorSubtaskState2);
            taskStateSnapshot3.putSubtaskStateByOperatorID(fromJobVertexID3, operatorSubtaskState3);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId, new CheckpointMetrics(), taskStateSnapshot2));
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(j, false));
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState4 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState5 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState operatorSubtaskState6 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot4.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState4);
            taskStateSnapshot5.putSubtaskStateByOperatorID(fromJobVertexID2, operatorSubtaskState5);
            taskStateSnapshot6.putSubtaskStateByOperatorID(fromJobVertexID3, operatorSubtaskState6);
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(checkpointId2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2, new CheckpointMetrics(), taskStateSnapshot6));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2, new CheckpointMetrics(), taskStateSnapshot4));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId, new CheckpointMetrics(), taskStateSnapshot));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2, new CheckpointMetrics(), taskStateSnapshot5));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.never())).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.never())).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.never())).discardState();
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(checkpointId2, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(j, completedCheckpoint.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertEquals(3L, completedCheckpoint.getOperatorStates().size());
            ((Execution) Mockito.verify(mockExecutionVertex6.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(checkpointId2), Mockito.eq(j));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId, new CheckpointMetrics(), taskStateSnapshot3));
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState3, Mockito.times(1))).discardState();
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.times(1))).discardState();
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.times(1))).discardState();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 200L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2, mockExecutionVertex3}, new ExecutionVertex[]{mockExecutionVertex4}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
            taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), taskStateSnapshot));
            long currentTimeMillis2 = System.currentTimeMillis() + 5000;
            do {
                Thread.sleep(250L);
                if (pendingCheckpoint.isDiscarded() || checkpointCoordinator.getNumberOfPendingCheckpoints() <= 0) {
                    break;
                }
            } while (System.currentTimeMillis() < currentTimeMillis2);
            Assert.assertTrue("Checkpoint was not canceled by the timeout", pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
            ((Execution) Mockito.verify(mockExecutionVertex4.getCurrentExecutionAttempt(), Mockito.times(0))).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 200000L, 200000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID2), mockExecutionVertex(executionAttemptID3)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID4)}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            long longValue = ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), executionAttemptID2, longValue));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), longValue));
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
        ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 20000L, 20000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2, mockExecutionVertex(executionAttemptID3)}, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(1L, false));
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot.putSubtaskStateByOperatorID(fromJobVertexID, operatorSubtaskState);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, new CheckpointMetrics(), taskStateSnapshot));
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot2));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot2, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, new CheckpointMetrics(), taskStateSnapshot4));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot4, Mockito.never())).discardState();
        Mockito.reset(new OperatorSubtaskState[]{operatorSubtaskState});
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID2, checkpointId));
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId, new CheckpointMetrics(), taskStateSnapshot5));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot5, Mockito.times(1))).discardState();
        Mockito.reset(new TaskStateSnapshot[]{taskStateSnapshot3});
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), taskStateSnapshot6));
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot6, Mockito.times(1))).discardState();
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            JobID jobID = new JobID();
            final long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            final AtomicInteger atomicInteger = new AtomicInteger();
            ((Execution) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.1
                private long lastId = -1;
                private long lastTs = -1;

                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m39answer(InvocationOnMock invocationOnMock) throws Throwable {
                    long longValue = ((Long) invocationOnMock.getArguments()[0]).longValue();
                    long longValue2 = ((Long) invocationOnMock.getArguments()[1]).longValue();
                    Assert.assertTrue(longValue > this.lastId);
                    Assert.assertTrue(longValue2 >= this.lastTs);
                    Assert.assertTrue(longValue2 >= currentTimeMillis);
                    this.lastId = longValue;
                    this.lastTs = longValue2;
                    atomicInteger.incrementAndGet();
                    return null;
                }
            }).when(mockExecutionVertex.getCurrentExecutionAttempt())).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis2 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (currentTimeMillis2 <= System.currentTimeMillis()) {
                    break;
                }
            } while (atomicInteger.get() < 5);
            Assert.assertTrue(atomicInteger.get() >= 5);
            checkpointCoordinator.stopCheckpointScheduler();
            int i = atomicInteger.get();
            Thread.sleep(400L);
            Assert.assertTrue(i == atomicInteger.get() || i + 1 == atomicInteger.get());
            atomicInteger.set(0);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis3 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (currentTimeMillis3 <= System.currentTimeMillis()) {
                    break;
                }
            } while (atomicInteger.get() < 5);
            Assert.assertTrue(atomicInteger.get() >= 5);
            checkpointCoordinator.stopCheckpointScheduler();
            int i2 = atomicInteger.get();
            Thread.sleep(400L);
            Assert.assertTrue(i2 == atomicInteger.get() || i2 + 1 == atomicInteger.get());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
        Execution currentExecutionAttempt = mockExecutionVertex.getCurrentExecutionAttempt();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ((Execution) Mockito.doAnswer(invocationOnMock -> {
            linkedBlockingQueue.add((Long) invocationOnMock.getArguments()[0]);
            return null;
        }).when(currentExecutionAttempt)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 2L, 200000L, 50L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        try {
            checkpointCoordinator.startCheckpointScheduler();
            Assert.assertEquals(1L, ((Long) linkedBlockingQueue.take()).longValue());
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID, 1L);
            long nanoTime = System.nanoTime();
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
            Long l = (Long) linkedBlockingQueue.take();
            long nanoTime2 = System.nanoTime();
            Assert.assertEquals(2L, l.longValue());
            long j = (nanoTime2 - nanoTime) / 1000000;
            if (j + 1 < 50) {
                Assert.fail("checkpoint came too early: delay was " + j + " but should have been at least 50");
            }
        } finally {
            checkpointCoordinator.stopCheckpointScheduler();
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
        ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture triggerSavepoint = checkpointCoordinator.triggerSavepoint(currentTimeMillis, absolutePath);
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(currentTimeMillis, pendingCheckpoint.getCheckpointTimestamp());
        Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
        Assert.assertFalse(pendingCheckpoint.canBeSubsumed());
        OperatorID fromJobVertexID = OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId());
        OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(mockExecutionVertex2.getJobvertexId());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(fromJobVertexID)).thenReturn(operatorSubtaskState);
        Mockito.when(taskStateSnapshot2.getSubtaskStateByOperatorID(fromJobVertexID2)).thenReturn(operatorSubtaskState2);
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue, new CheckpointMetrics(), taskStateSnapshot2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue, new CheckpointMetrics(), taskStateSnapshot));
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        Assert.assertTrue(triggerSavepoint.isDone());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue), Mockito.eq(currentTimeMillis));
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue), Mockito.eq(currentTimeMillis));
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(jobID, completedCheckpoint.getJobId());
        Assert.assertEquals(currentTimeMillis, completedCheckpoint.getTimestamp());
        Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
        Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
        long j = currentTimeMillis + 7;
        CompletableFuture triggerSavepoint2 = checkpointCoordinator.triggerSavepoint(j, absolutePath);
        Assert.assertFalse(triggerSavepoint2.isDone());
        long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2));
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
        Assert.assertEquals(j, completedCheckpoint2.getTimestamp());
        Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
        Assert.assertTrue(completedCheckpoint2.getOperatorStates().isEmpty());
        Assert.assertTrue(triggerSavepoint2.isDone());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.never())).discardState();
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).triggerCheckpoint(Mockito.eq(longValue2), Mockito.eq(j), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), Mockito.eq(j));
        ((Execution) Mockito.verify(mockExecutionVertex2.getCurrentExecutionAttempt(), Mockito.times(1))).notifyCheckpointComplete(Mockito.eq(longValue2), Mockito.eq(j));
        checkpointCoordinator.shutdown(JobStatus.FINISHED);
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
        ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, standaloneCheckpointIDCounter, new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture triggerSavepoint = checkpointCoordinator.triggerSavepoint(currentTimeMillis, absolutePath);
        long last = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis + 1, false));
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis + 2, false));
        long last2 = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last2));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last2));
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(last))).isDiscarded());
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis + 3, false));
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture triggerSavepoint2 = checkpointCoordinator.triggerSavepoint(currentTimeMillis + 4, absolutePath);
        long last3 = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last3));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last3));
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(last))).isDiscarded());
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertTrue(triggerSavepoint2.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, last));
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue(triggerSavepoint.isDone());
    }

    private void testMaxConcurrentAttempts(int i) {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            AtomicInteger atomicInteger = new AtomicInteger();
            Execution currentExecutionAttempt = mockExecutionVertex.getCurrentExecutionAttempt();
            ((Execution) Mockito.doAnswer(invocationOnMock -> {
                atomicInteger.incrementAndGet();
                return null;
            }).when(currentExecutionAttempt)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            ((Execution) Mockito.doAnswer(invocationOnMock2 -> {
                atomicInteger.incrementAndGet();
                return null;
            }).when(currentExecutionAttempt)).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, i, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 60000;
            long j2 = currentTimeMillis + 100;
            while (true) {
                Thread.sleep(20L);
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 < j2 || (atomicInteger.get() < i && currentTimeMillis2 < j)) {
                }
            }
            Assert.assertEquals(i, atomicInteger.get());
            ((Execution) Mockito.verify(mockExecutionVertex.getCurrentExecutionAttempt(), Mockito.times(i))).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L));
            long currentTimeMillis3 = System.currentTimeMillis();
            long j3 = currentTimeMillis3 + 60000;
            do {
                Thread.sleep(20L);
                if (atomicInteger.get() >= i + 1) {
                    break;
                }
            } while (currentTimeMillis3 < j3);
            Assert.assertEquals(i + 1, atomicInteger.get());
            Thread.sleep(200L);
            Assert.assertEquals(i + 1, atomicInteger.get());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempsWithSubsumption() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, 2, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID2)}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID())}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 60000;
            long j2 = currentTimeMillis + 100;
            while (true) {
                Thread.sleep(20L);
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 < j2 || (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2 && currentTimeMillis2 < j)) {
                }
            }
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 2L));
            long currentTimeMillis3 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (checkpointCoordinator.getPendingCheckpoints().get(4L) != null) {
                    break;
                }
            } while (System.currentTimeMillis() < currentTimeMillis3);
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            AtomicReference atomicReference = new AtomicReference(ExecutionState.CREATED);
            Mockito.when(mockExecutionVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocationOnMock -> {
                return (ExecutionState) atomicReference.get();
            });
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, 2, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            checkpointCoordinator.startCheckpointScheduler();
            Thread.sleep(200L);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            atomicReference.set(ExecutionState.RUNNING);
            long currentTimeMillis = System.currentTimeMillis() + 10000;
            do {
                Thread.sleep(20L);
                if (System.currentTimeMillis() >= currentTimeMillis) {
                    break;
                }
            } while (checkpointCoordinator.getNumberOfPendingCheckpoints() == 0);
            Assert.assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 100000L, 200000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, standaloneCheckpointIDCounter, new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        ArrayList arrayList = new ArrayList();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < 5; i++) {
            arrayList.add(checkpointCoordinator.triggerSavepoint(i, absolutePath));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((CompletableFuture) it.next()).isDone());
        }
        long last = standaloneCheckpointIDCounter.getLast();
        int i2 = 0;
        while (i2 < 5) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, last));
            i2++;
            last--;
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((CompletableFuture) it2.next()).isDone());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(new ExecutionAttemptID());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 100000L, 200000L, 100000000L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        Assert.assertFalse("Did not trigger savepoint", checkpointCoordinator.triggerSavepoint(0L, absolutePath).isDone());
        Assert.assertFalse("Did not trigger savepoint", checkpointCoordinator.triggerSavepoint(1L, absolutePath).isDone());
    }

    @Test
    public void testRestoreLatestCheckpointedState() throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobVertexID, 3, 42);
        ExecutionJobVertex mockExecutionJobVertex2 = mockExecutionJobVertex(jobVertexID2, 2, 13);
        ArrayList arrayList = new ArrayList(3 + 2);
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex.getTaskVertices()));
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex2.getTaskVertices()));
        ExecutionVertex[] executionVertexArr = (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
        RecoverableCompletedCheckpointStore recoverableCompletedCheckpointStore = new RecoverableCompletedCheckpointStore();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, executionVertexArr, executionVertexArr, executionVertexArr, new StandaloneCheckpointIDCounter(), recoverableCompletedCheckpointStore, new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false);
        Assert.assertTrue(checkpointCoordinator.getPendingCheckpoints().keySet().size() == 1);
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, 2);
        for (int i = 0; i < mockExecutionJobVertex.getParallelism(); i++) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), mockSubtaskState(jobVertexID, i, (KeyGroupRange) createKeyGroupPartitions.get(i))));
        }
        for (int i2 = 0; i2 < mockExecutionJobVertex2.getParallelism(); i2++) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex2.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), mockSubtaskState(jobVertexID2, i2, (KeyGroupRange) createKeyGroupPartitions2.get(i2))));
        }
        List successfulCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
        Assert.assertEquals(1L, successfulCheckpoints.size());
        recoverableCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED);
        HashMap hashMap = new HashMap();
        hashMap.put(jobVertexID, mockExecutionJobVertex);
        hashMap.put(jobVertexID2, mockExecutionJobVertex2);
        checkpointCoordinator.restoreLatestCheckpointedState(hashMap, true, false);
        Iterator it = successfulCheckpoints.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((CompletedCheckpoint) it.next()).getOperatorStates().values().iterator();
            while (it2.hasNext()) {
                Iterator it3 = ((OperatorState) it2.next()).getStates().iterator();
                while (it3.hasNext()) {
                    ((OperatorSubtaskState) Mockito.verify((OperatorSubtaskState) it3.next(), Mockito.times(2))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
                }
            }
        }
        verifyStateRestore(jobVertexID, mockExecutionJobVertex, createKeyGroupPartitions);
        verifyStateRestore(jobVertexID2, mockExecutionJobVertex2, createKeyGroupPartitions2);
    }

    @Test(expected = IllegalStateException.class)
    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobVertexID, 3, 42);
        ExecutionJobVertex mockExecutionJobVertex2 = mockExecutionJobVertex(jobVertexID2, 2, 13);
        ArrayList arrayList = new ArrayList(3 + 2);
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex.getTaskVertices()));
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex2.getTaskVertices()));
        ExecutionVertex[] executionVertexArr = (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, executionVertexArr, executionVertexArr, executionVertexArr, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false);
        Assert.assertTrue(checkpointCoordinator.getPendingCheckpoints().keySet().size() == 1);
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, 2);
        for (int i = 0; i < mockExecutionJobVertex.getParallelism(); i++) {
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState((OperatorStateHandle) null, (OperatorStateHandle) null, generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i), false), (KeyedStateHandle) null);
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot));
        }
        for (int i2 = 0; i2 < mockExecutionJobVertex2.getParallelism(); i2++) {
            OperatorSubtaskState operatorSubtaskState2 = new OperatorSubtaskState((OperatorStateHandle) null, (OperatorStateHandle) null, generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i2), false), (KeyedStateHandle) null);
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), operatorSubtaskState2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex2.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2));
        }
        Assert.assertEquals(1L, checkpointCoordinator.getSuccessfulCheckpoints().size());
        HashMap hashMap = new HashMap();
        ExecutionJobVertex mockExecutionJobVertex3 = mockExecutionJobVertex(jobVertexID, 3, 20);
        ExecutionJobVertex mockExecutionJobVertex4 = mockExecutionJobVertex(jobVertexID2, 2, 42);
        hashMap.put(jobVertexID, mockExecutionJobVertex3);
        hashMap.put(jobVertexID2, mockExecutionJobVertex4);
        checkpointCoordinator.restoreLatestCheckpointedState(hashMap, true, false);
        Assert.fail("The restoration should have failed because the max parallelism changed.");
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        testStateRecoveryWithTopologyChange(0);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        testStateRecoveryWithTopologyChange(1);
    }

    @Test
    public void testStateRecoveryWhenTopologyChange() throws Exception {
        testStateRecoveryWithTopologyChange(2);
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean z) throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int i = z ? 2 : 13;
        int i2 = z ? 13 : 2;
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobVertexID, 3, 42);
        ExecutionJobVertex mockExecutionJobVertex2 = mockExecutionJobVertex(jobVertexID2, i, 13);
        ArrayList arrayList = new ArrayList(3 + i);
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex.getTaskVertices()));
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex2.getTaskVertices()));
        ExecutionVertex[] executionVertexArr = (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, executionVertexArr, executionVertexArr, executionVertexArr, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false);
        Assert.assertTrue(checkpointCoordinator.getPendingCheckpoints().keySet().size() == 1);
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, i);
        for (int i3 = 0; i3 < mockExecutionJobVertex.getParallelism(); i3++) {
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(generatePartitionableStateHandle(jobVertexID, i3, 2, 8, false), (OperatorStateHandle) null, generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), false), generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), true));
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex.getTaskVertices()[i3].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot));
        }
        ArrayList arrayList2 = new ArrayList(mockExecutionJobVertex2.getParallelism());
        ArrayList arrayList3 = new ArrayList(mockExecutionJobVertex2.getParallelism());
        for (int i4 = 0; i4 < mockExecutionJobVertex2.getParallelism(); i4++) {
            KeyGroupsStateHandle generateKeyGroupState = generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), false);
            KeyGroupsStateHandle generateKeyGroupState2 = generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), true);
            OperatorStateHandle generatePartitionableStateHandle = generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, true);
            arrayList2.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle)));
            arrayList3.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle2)));
            OperatorSubtaskState operatorSubtaskState2 = new OperatorSubtaskState(generatePartitionableStateHandle, generatePartitionableStateHandle2, generateKeyGroupState, generateKeyGroupState2);
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), operatorSubtaskState2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, mockExecutionJobVertex2.getTaskVertices()[i4].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2));
        }
        Assert.assertEquals(1L, checkpointCoordinator.getSuccessfulCheckpoints().size());
        HashMap hashMap = new HashMap();
        List createKeyGroupPartitions3 = StateAssignmentOperation.createKeyGroupPartitions(13, i2);
        ExecutionJobVertex mockExecutionJobVertex3 = mockExecutionJobVertex(jobVertexID, 3, 42);
        ExecutionJobVertex mockExecutionJobVertex4 = mockExecutionJobVertex(jobVertexID2, i2, 13);
        hashMap.put(jobVertexID, mockExecutionJobVertex3);
        hashMap.put(jobVertexID2, mockExecutionJobVertex4);
        checkpointCoordinator.restoreLatestCheckpointedState(hashMap, true, false);
        verifyStateRestore(jobVertexID, mockExecutionJobVertex3, createKeyGroupPartitions);
        ArrayList arrayList4 = new ArrayList(mockExecutionJobVertex4.getParallelism());
        ArrayList arrayList5 = new ArrayList(mockExecutionJobVertex4.getParallelism());
        for (int i5 = 0; i5 < mockExecutionJobVertex4.getParallelism(); i5++) {
            List operatorIDs = mockExecutionJobVertex4.getOperatorIDs();
            KeyGroupsStateHandle generateKeyGroupState3 = generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), false);
            KeyGroupsStateHandle generateKeyGroupState4 = generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), true);
            JobManagerTaskRestore taskRestore = mockExecutionJobVertex4.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot3 = taskRestore.getTaskStateSnapshot();
            int size = operatorIDs.size() - 1;
            ArrayList arrayList6 = new ArrayList(operatorIDs.size());
            ArrayList arrayList7 = new ArrayList(operatorIDs.size());
            for (int i6 = 0; i6 < operatorIDs.size(); i6++) {
                OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot3.getSubtaskStateByOperatorID((OperatorID) operatorIDs.get(i6));
                StateObjectCollection managedOperatorState = subtaskStateByOperatorID.getManagedOperatorState();
                StateObjectCollection rawOperatorState = subtaskStateByOperatorID.getRawOperatorState();
                arrayList6.add(managedOperatorState);
                arrayList7.add(rawOperatorState);
                if (i6 == size) {
                    StateObjectCollection managedKeyedState = subtaskStateByOperatorID.getManagedKeyedState();
                    StateObjectCollection rawKeyedState = subtaskStateByOperatorID.getRawKeyedState();
                    compareKeyedState(Collections.singletonList(generateKeyGroupState3), managedKeyedState);
                    compareKeyedState(Collections.singletonList(generateKeyGroupState4), rawKeyedState);
                }
            }
            arrayList4.add(arrayList6);
            arrayList5.add(arrayList7);
        }
        comparePartitionableState(arrayList2, arrayList4);
        comparePartitionableState(arrayList3, arrayList5);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        return new Tuple2<>(jobVertexID, OperatorID.fromJobVertexID(jobVertexID));
    }

    public void testStateRecoveryWithTopologyChange(int i) throws Exception {
        Tuple2<JobVertexID, OperatorID> generateIDPair = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair2 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair3 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair4 = generateIDPair();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(64, 10);
        HashMap hashMap = new HashMap();
        for (Tuple2 tuple2 : Arrays.asList(generateIDPair, generateIDPair2)) {
            OperatorState operatorState = new OperatorState((OperatorID) tuple2.f1, 10, 64);
            hashMap.put(tuple2.f1, operatorState);
            for (int i2 = 0; i2 < operatorState.getParallelism(); i2++) {
                operatorState.putState(i2, new OperatorSubtaskState(generatePartitionableStateHandle((JobVertexID) tuple2.f0, i2, 2, 8, false), generatePartitionableStateHandle((JobVertexID) tuple2.f0, i2, 2, 8, true), (KeyedStateHandle) null, (KeyedStateHandle) null));
            }
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Tuple2 tuple22 : Arrays.asList(generateIDPair3, generateIDPair4)) {
            OperatorState operatorState2 = new OperatorState((OperatorID) tuple22.f1, 10, 64);
            hashMap.put(tuple22.f1, operatorState2);
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            arrayList.add(arrayList3);
            arrayList2.add(arrayList4);
            for (int i3 = 0; i3 < operatorState2.getParallelism(); i3++) {
                OperatorStateHandle operatorStateHandle = generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i3, 2, 8, false).get(0);
                OperatorStateHandle operatorStateHandle2 = generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i3, 2, 8, true).get(0);
                KeyGroupsStateHandle generateKeyGroupState = ((JobVertexID) tuple22.f0).equals(generateIDPair3.f0) ? generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i3), false) : null;
                KeyGroupsStateHandle generateKeyGroupState2 = ((JobVertexID) tuple22.f0).equals(generateIDPair3.f0) ? generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i3), true) : null;
                arrayList3.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle));
                arrayList4.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle2));
                operatorState2.putState(i3, new OperatorSubtaskState(operatorStateHandle, operatorStateHandle2, generateKeyGroupState, generateKeyGroupState2));
            }
        }
        Tuple2<JobVertexID, OperatorID> generateIDPair5 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair6 = generateIDPair();
        int i4 = 10;
        if (i == 0) {
            i4 = 20;
        } else if (i == 1) {
            i4 = 8;
        }
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(64, i4);
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex((JobVertexID) generateIDPair5.f0, Arrays.asList((OperatorID) generateIDPair2.f1, (OperatorID) generateIDPair.f1, (OperatorID) generateIDPair5.f1), 10, 64);
        ExecutionJobVertex mockExecutionJobVertex2 = mockExecutionJobVertex((JobVertexID) generateIDPair3.f0, Arrays.asList((OperatorID) generateIDPair6.f1, (OperatorID) generateIDPair3.f1), i4, 64);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(generateIDPair5.f0, mockExecutionJobVertex);
        hashMap2.put(generateIDPair3.f0, mockExecutionJobVertex2);
        JobID jobID = new JobID();
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = (StandaloneCompletedCheckpointStore) Mockito.spy(new StandaloneCompletedCheckpointStore(1));
        Mockito.when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(new CompletedCheckpoint(jobID, 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000, hashMap, Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation()));
        new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, mockExecutionJobVertex.getTaskVertices(), mockExecutionJobVertex.getTaskVertices(), mockExecutionJobVertex.getTaskVertices(), new StandaloneCheckpointIDCounter(), standaloneCompletedCheckpointStore, new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY).restoreLatestCheckpointedState(hashMap2, false, true);
        for (int i5 = 0; i5 < mockExecutionJobVertex.getParallelism(); i5++) {
            List operatorIDs = mockExecutionJobVertex.getOperatorIDs();
            JobManagerTaskRestore taskRestore = mockExecutionJobVertex.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot.getSubtaskStateByOperatorID((OperatorID) operatorIDs.get(operatorIDs.size() - 1));
            Assert.assertTrue(subtaskStateByOperatorID.getManagedKeyedState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID.getRawKeyedState().isEmpty());
            OperatorSubtaskState subtaskStateByOperatorID2 = taskStateSnapshot.getSubtaskStateByOperatorID((OperatorID) operatorIDs.get(2));
            Assert.assertTrue(subtaskStateByOperatorID2.getManagedOperatorState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID2.getRawOperatorState().isEmpty());
            OperatorSubtaskState subtaskStateByOperatorID3 = taskStateSnapshot.getSubtaskStateByOperatorID((OperatorID) operatorIDs.get(1));
            OperatorStateHandle generatePartitionableStateHandle = generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i5, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i5, 2, 8, true);
            StateObjectCollection managedOperatorState = subtaskStateByOperatorID3.getManagedOperatorState();
            Assert.assertEquals(1L, managedOperatorState.size());
            Assert.assertTrue(CommonTestUtils.isSteamContentEqual(generatePartitionableStateHandle.openInputStream(), ((OperatorStateHandle) managedOperatorState.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState = subtaskStateByOperatorID3.getRawOperatorState();
            Assert.assertEquals(1L, rawOperatorState.size());
            Assert.assertTrue(CommonTestUtils.isSteamContentEqual(generatePartitionableStateHandle2.openInputStream(), ((OperatorStateHandle) rawOperatorState.iterator().next()).openInputStream()));
            OperatorSubtaskState subtaskStateByOperatorID4 = taskStateSnapshot.getSubtaskStateByOperatorID((OperatorID) operatorIDs.get(0));
            OperatorStateHandle generatePartitionableStateHandle3 = generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i5, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle4 = generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i5, 2, 8, true);
            StateObjectCollection managedOperatorState2 = subtaskStateByOperatorID4.getManagedOperatorState();
            Assert.assertEquals(1L, managedOperatorState2.size());
            Assert.assertTrue(CommonTestUtils.isSteamContentEqual(generatePartitionableStateHandle3.openInputStream(), ((OperatorStateHandle) managedOperatorState2.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState2 = subtaskStateByOperatorID4.getRawOperatorState();
            Assert.assertEquals(1L, rawOperatorState2.size());
            Assert.assertTrue(CommonTestUtils.isSteamContentEqual(generatePartitionableStateHandle4.openInputStream(), ((OperatorStateHandle) rawOperatorState2.iterator().next()).openInputStream()));
        }
        ArrayList arrayList5 = new ArrayList(mockExecutionJobVertex2.getParallelism());
        ArrayList arrayList6 = new ArrayList(mockExecutionJobVertex2.getParallelism());
        for (int i6 = 0; i6 < mockExecutionJobVertex2.getParallelism(); i6++) {
            List operatorIDs2 = mockExecutionJobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore2 = mockExecutionJobVertex2.getTaskVertices()[i6].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(2L, taskRestore2.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot2 = taskRestore2.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID5 = taskStateSnapshot2.getSubtaskStateByOperatorID((OperatorID) operatorIDs2.get(1));
            ArrayList arrayList7 = new ArrayList(1);
            arrayList7.add(subtaskStateByOperatorID5.getManagedOperatorState());
            ArrayList arrayList8 = new ArrayList(1);
            arrayList8.add(subtaskStateByOperatorID5.getRawOperatorState());
            arrayList5.add(arrayList7);
            arrayList6.add(arrayList8);
            OperatorSubtaskState subtaskStateByOperatorID6 = taskStateSnapshot2.getSubtaskStateByOperatorID((OperatorID) operatorIDs2.get(0));
            Assert.assertTrue(subtaskStateByOperatorID6.getManagedOperatorState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID6.getRawOperatorState().isEmpty());
            KeyGroupsStateHandle generateKeyGroupState3 = generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i6), false);
            KeyGroupsStateHandle generateKeyGroupState4 = generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i6), true);
            OperatorSubtaskState subtaskStateByOperatorID7 = taskStateSnapshot2.getSubtaskStateByOperatorID((OperatorID) operatorIDs2.get(operatorIDs2.size() - 1));
            StateObjectCollection managedKeyedState = subtaskStateByOperatorID7.getManagedKeyedState();
            StateObjectCollection rawKeyedState = subtaskStateByOperatorID7.getRawKeyedState();
            compareKeyedState(Collections.singletonList(generateKeyGroupState3), managedKeyedState);
            compareKeyedState(Collections.singletonList(generateKeyGroupState4), rawKeyedState);
        }
        comparePartitionableState((List) arrayList.get(0), arrayList5);
        comparePartitionableState((List) arrayList2.get(0), arrayList6);
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(new ExecutionAttemptID());
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.RETAIN_ON_FAILURE, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
            Iterator it = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), ((PendingCheckpoint) it.next()).getProps());
            }
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testReplicateModeStateHandle() {
        HashMap hashMap = new HashMap(1);
        hashMap.put("t-1", new OperatorStateHandle.StateMetaInfo(new long[]{0, 23}, OperatorStateHandle.Mode.UNION));
        hashMap.put("t-2", new OperatorStateHandle.StateMetaInfo(new long[]{42, 64}, OperatorStateHandle.Mode.UNION));
        hashMap.put("t-3", new OperatorStateHandle.StateMetaInfo(new long[]{72, 83}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
        hashMap.put("t-4", new OperatorStateHandle.StateMetaInfo(new long[]{87, 94, 95}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{97, 108, 112}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{121, 143, 147}, OperatorStateHandle.Mode.BROADCAST));
        List repartitionState = RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(Collections.singletonList(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test", new byte[150]))), 3);
        HashMap hashMap2 = new HashMap(3);
        Iterator it = repartitionState.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Collection) it.next()).iterator();
            while (it2.hasNext()) {
                for (Map.Entry entry : ((OperatorStateHandle) it2.next()).getStateNameToPartitionOffsets().entrySet()) {
                    String str = (String) entry.getKey();
                    Integer num = (Integer) hashMap2.get(str);
                    if (null == num) {
                        hashMap2.put(str, 1);
                    } else {
                        hashMap2.put(str, Integer.valueOf(1 + num.intValue()));
                    }
                    OperatorStateHandle.StateMetaInfo stateMetaInfo = (OperatorStateHandle.StateMetaInfo) entry.getValue();
                    if (OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.equals(stateMetaInfo.getDistributionMode())) {
                        Assert.assertEquals(1L, ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets().length);
                    } else if (OperatorStateHandle.Mode.UNION.equals(stateMetaInfo.getDistributionMode())) {
                        Assert.assertEquals(2L, ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets().length);
                    } else {
                        Assert.assertEquals(3L, ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets().length);
                    }
                }
            }
        }
        Assert.assertEquals(6L, hashMap2.size());
        Assert.assertEquals(3L, ((Integer) hashMap2.get("t-1")).intValue());
        Assert.assertEquals(3L, ((Integer) hashMap2.get("t-2")).intValue());
        Assert.assertEquals(2L, ((Integer) hashMap2.get("t-3")).intValue());
        Assert.assertEquals(3L, ((Integer) hashMap2.get("t-4")).intValue());
        Assert.assertEquals(3L, ((Integer) hashMap2.get("t-5")).intValue());
        Assert.assertEquals(3L, ((Integer) hashMap2.get("t-6")).intValue());
    }

    public static KeyGroupsStateHandle generateKeyGroupState(JobVertexID jobVertexID, KeyGroupRange keyGroupRange, boolean z) throws IOException {
        ArrayList arrayList = new ArrayList(keyGroupRange.getNumberOfKeyGroups());
        Iterator it = keyGroupRange.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            int hashCode = jobVertexID.hashCode();
            arrayList.add(Integer.valueOf(new Random(z ? hashCode * (31 + intValue) : hashCode + intValue).nextInt()));
        }
        return generateKeyGroupState(keyGroupRange, arrayList);
    }

    public static KeyGroupsStateHandle generateKeyGroupState(KeyGroupRange keyGroupRange, List<? extends Serializable> list) throws IOException {
        Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == list.size());
        Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets = serializeTogetherAndTrackOffsets(Collections.singletonList(list));
        return new KeyGroupsStateHandle(new KeyGroupRangeOffsets(keyGroupRange, (long[]) ((List) serializeTogetherAndTrackOffsets.f1).get(0)), new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), (byte[]) serializeTogetherAndTrackOffsets.f0));
    }

    public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(List<List<? extends Serializable>> list) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList<byte[]> arrayList2 = new ArrayList();
        int i = 0;
        for (List<? extends Serializable> list2 : list) {
            long[] jArr = new long[list2.size()];
            arrayList.add(jArr);
            for (int i2 = 0; i2 < list2.size(); i2++) {
                jArr[i2] = i;
                byte[] serializeObject = InstantiationUtil.serializeObject(list2.get(i2));
                arrayList2.add(serializeObject);
                i += serializeObject.length;
            }
        }
        byte[] bArr = new byte[i];
        int i3 = 0;
        for (byte[] bArr2 : arrayList2) {
            System.arraycopy(bArr2, 0, bArr, i3, bArr2.length);
            i3 += bArr2.length;
        }
        return new Tuple2<>(bArr, arrayList);
    }

    public static OperatorStateHandle generatePartitionableStateHandle(JobVertexID jobVertexID, int i, int i2, int i3, boolean z) throws IOException {
        HashMap hashMap = new HashMap(i2);
        for (int i4 = 0; i4 < i2; i4++) {
            ArrayList arrayList = new ArrayList(i3);
            int hashCode = (jobVertexID.hashCode() * i) + (i4 * i2);
            if (z) {
                hashCode = (hashCode + 1) * 31;
            }
            Random random = new Random(hashCode);
            for (int i5 = 0; i5 < i3; i5++) {
                arrayList.add(Integer.valueOf(random.nextInt()));
            }
            hashMap.put("state-" + i4, arrayList);
        }
        return generatePartitionableStateHandle(hashMap);
    }

    public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(JobVertexID jobVertexID, int i, int i2, int i3, boolean z) throws IOException {
        HashMap hashMap = new HashMap(i2);
        for (int i4 = 0; i4 < i2; i4++) {
            ArrayList arrayList = new ArrayList(i3);
            int hashCode = (jobVertexID.hashCode() * i) + (i4 * i2);
            if (z) {
                hashCode = (hashCode + 1) * 31;
            }
            Random random = new Random(hashCode);
            for (int i5 = 0; i5 < i3; i5++) {
                arrayList.add(Integer.valueOf(random.nextInt()));
            }
            hashMap.put("state-" + i4, arrayList);
        }
        return ChainedStateHandle.wrapSingleHandle(generatePartitionableStateHandle(hashMap));
    }

    private static OperatorStateHandle generatePartitionableStateHandle(Map<String, List<? extends Serializable>> map) throws IOException {
        ArrayList arrayList = new ArrayList(map.size());
        Iterator<Map.Entry<String, List<? extends Serializable>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue());
        }
        Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets = serializeTogetherAndTrackOffsets(arrayList);
        HashMap hashMap = new HashMap(map.size());
        int i = 0;
        Iterator<Map.Entry<String, List<? extends Serializable>>> it2 = map.entrySet().iterator();
        while (it2.hasNext()) {
            hashMap.put(it2.next().getKey(), new OperatorStateHandle.StateMetaInfo((long[]) ((List) serializeTogetherAndTrackOffsets.f1).get(i), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            i++;
        }
        return new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), (byte[]) serializeTogetherAndTrackOffsets.f0));
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, int i, int i2) {
        return mockExecutionJobVertex(jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID)), i, i2);
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, List<OperatorID> list, int i, int i2) {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        ExecutionVertex[] executionVertexArr = new ExecutionVertex[i];
        for (int i3 = 0; i3 < i; i3++) {
            executionVertexArr[i3] = mockExecutionVertex(new ExecutionAttemptID(), jobVertexID, list, i, i2, ExecutionState.RUNNING, new ExecutionState[0]);
            Mockito.when(Integer.valueOf(executionVertexArr[i3].getParallelSubtaskIndex())).thenReturn(Integer.valueOf(i3));
        }
        Mockito.when(executionJobVertex.getJobVertexId()).thenReturn(jobVertexID);
        Mockito.when(executionJobVertex.getTaskVertices()).thenReturn(executionVertexArr);
        Mockito.when(Integer.valueOf(executionJobVertex.getParallelism())).thenReturn(Integer.valueOf(i));
        Mockito.when(Integer.valueOf(executionJobVertex.getMaxParallelism())).thenReturn(Integer.valueOf(i2));
        Mockito.when(Boolean.valueOf(executionJobVertex.isMaxParallelismConfigured())).thenReturn(true);
        Mockito.when(executionJobVertex.getOperatorIDs()).thenReturn(list);
        Mockito.when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[list.size()]));
        return executionJobVertex;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionAttemptID) {
        JobVertexID jobVertexID = new JobVertexID();
        return mockExecutionVertex(executionAttemptID, jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID)), 1, 1, ExecutionState.RUNNING, new ExecutionState[0]);
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionAttemptID, JobVertexID jobVertexID, List<OperatorID> list, int i, int i2, ExecutionState executionState, ExecutionState... executionStateArr) {
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Execution execution = (Execution) Mockito.spy(new Execution((Executor) Mockito.mock(Executor.class), executionVertex, 1, 1L, 1L, Time.milliseconds(500L)));
        Mockito.when(execution.getAttemptId()).thenReturn(executionAttemptID);
        Mockito.when(execution.getState()).thenReturn(executionState, executionStateArr);
        Mockito.when(executionVertex.getJobvertexId()).thenReturn(jobVertexID);
        Mockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        Mockito.when(Integer.valueOf(executionVertex.getTotalNumberOfParallelSubtasks())).thenReturn(Integer.valueOf(i));
        Mockito.when(Integer.valueOf(executionVertex.getMaxParallelism())).thenReturn(Integer.valueOf(i2));
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        Mockito.when(executionJobVertex.getOperatorIDs()).thenReturn(list);
        Mockito.when(executionVertex.getJobVertex()).thenReturn(executionJobVertex);
        return executionVertex;
    }

    static TaskStateSnapshot mockSubtaskState(JobVertexID jobVertexID, int i, KeyGroupRange keyGroupRange) throws IOException {
        OperatorStateHandle generatePartitionableStateHandle = generatePartitionableStateHandle(jobVertexID, i, 2, 8, false);
        KeyGroupsStateHandle generateKeyGroupState = generateKeyGroupState(jobVertexID, keyGroupRange, false);
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), (OperatorSubtaskState) Mockito.spy(new OperatorSubtaskState(generatePartitionableStateHandle, (OperatorStateHandle) null, generateKeyGroupState, (KeyedStateHandle) null)));
        return taskStateSnapshot;
    }

    public static void verifyStateRestore(JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> list) throws Exception {
        for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
            JobManagerTaskRestore taskRestore = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
            OperatorSubtaskState subtaskStateByOperatorID = taskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID));
            Assert.assertTrue(CommonTestUtils.isSteamContentEqual(generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false).get(0).openInputStream(), ((OperatorStateHandle) subtaskStateByOperatorID.getManagedOperatorState().iterator().next()).openInputStream()));
            compareKeyedState(Collections.singletonList(generateKeyGroupState(jobVertexID, list.get(i), false)), subtaskStateByOperatorID.getManagedKeyedState());
        }
    }

    public static void compareKeyedState(Collection<KeyGroupsStateHandle> collection, Collection<? extends KeyedStateHandle> collection2) throws Exception {
        KeyGroupsStateHandle next = collection.iterator().next();
        int numberOfKeyGroups = next.getKeyGroupRange().getNumberOfKeyGroups();
        int i = 0;
        for (KeyedStateHandle keyedStateHandle : collection2) {
            Assert.assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle);
            i += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        }
        Assert.assertEquals(numberOfKeyGroups, i);
        FSDataInputStream openInputStream = next.openInputStream();
        Throwable th = null;
        try {
            Iterator it = next.getKeyGroupRange().iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                openInputStream.seek(next.getOffsetForKeyGroup(intValue));
                int intValue2 = ((Integer) InstantiationUtil.deserializeObject(openInputStream, Thread.currentThread().getContextClassLoader())).intValue();
                Iterator<? extends KeyedStateHandle> it2 = collection2.iterator();
                while (it2.hasNext()) {
                    KeyGroupsStateHandle keyGroupsStateHandle = (KeyedStateHandle) it2.next();
                    Assert.assertTrue(keyGroupsStateHandle instanceof KeyGroupsStateHandle);
                    KeyGroupsStateHandle keyGroupsStateHandle2 = keyGroupsStateHandle;
                    if (keyGroupsStateHandle2.getKeyGroupRange().contains(intValue)) {
                        long offsetForKeyGroup = keyGroupsStateHandle2.getOffsetForKeyGroup(intValue);
                        FSDataInputStream openInputStream2 = keyGroupsStateHandle2.openInputStream();
                        Throwable th2 = null;
                        try {
                            try {
                                openInputStream2.seek(offsetForKeyGroup);
                                Assert.assertEquals(intValue2, ((Integer) InstantiationUtil.deserializeObject(openInputStream2, Thread.currentThread().getContextClassLoader())).intValue());
                                if (openInputStream2 != null) {
                                    if (0 != 0) {
                                        try {
                                            openInputStream2.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        openInputStream2.close();
                                    }
                                }
                            } catch (Throwable th4) {
                                th2 = th4;
                                throw th4;
                            }
                        } catch (Throwable th5) {
                            if (openInputStream2 != null) {
                                if (th2 != null) {
                                    try {
                                        openInputStream2.close();
                                    } catch (Throwable th6) {
                                        th2.addSuppressed(th6);
                                    }
                                } else {
                                    openInputStream2.close();
                                }
                            }
                            throw th5;
                        }
                    }
                }
            }
            if (openInputStream != null) {
                if (0 == 0) {
                    openInputStream.close();
                    return;
                }
                try {
                    openInputStream.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (openInputStream != null) {
                if (0 != 0) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th8;
        }
    }

    public static void comparePartitionableState(List<ChainedStateHandle<OperatorStateHandle>> list, List<List<Collection<OperatorStateHandle>>> list2) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : list) {
            for (int i = 0; i < chainedStateHandle.getLength(); i++) {
                collectResult(i, chainedStateHandle.get(i), arrayList);
            }
        }
        Collections.sort(arrayList);
        ArrayList arrayList2 = new ArrayList();
        for (List<Collection<OperatorStateHandle>> list3 : list2) {
            if (list3 != null) {
                for (int i2 = 0; i2 < list3.size(); i2++) {
                    Collection<OperatorStateHandle> collection = list3.get(i2);
                    Assert.assertNotNull(collection);
                    Iterator<OperatorStateHandle> it = collection.iterator();
                    while (it.hasNext()) {
                        collectResult(i2, it.next(), arrayList2);
                    }
                }
            }
        }
        Collections.sort(arrayList2);
        Assert.assertEquals(arrayList, arrayList2);
    }

    private static void collectResult(int i, OperatorStateHandle operatorStateHandle, List<String> list) throws Exception {
        FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                for (Map.Entry entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                    for (long j : ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets()) {
                        openInputStream.seek(j);
                        list.add(i + " : " + ((String) entry.getKey()) + " : " + ((Integer) InstantiationUtil.deserializeObject(openInputStream, Thread.currentThread().getContextClassLoader())));
                    }
                }
                if (openInputStream != null) {
                    if (0 == 0) {
                        openInputStream.close();
                        return;
                    }
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        testCreateKeyGroupPartitions(1, 1);
        testCreateKeyGroupPartitions(13, 1);
        testCreateKeyGroupPartitions(13, 2);
        testCreateKeyGroupPartitions(32767, 1);
        testCreateKeyGroupPartitions(32767, 13);
        testCreateKeyGroupPartitions(32767, 32767);
        Random random = new Random(1234L);
        for (int i = 0; i < 1000; i++) {
            int nextInt = 1 + random.nextInt(32766);
            testCreateKeyGroupPartitions(nextInt, 1 + random.nextInt(nextInt));
        }
    }

    @Test
    public void testStopPeriodicScheduler() throws Exception {
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(new ExecutionAttemptID());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        CheckpointTriggerResult triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true);
        Assert.assertTrue(triggerCheckpoint.isFailure());
        Assert.assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerCheckpoint.getFailureReason());
        Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false).isFailure());
    }

    private void testCreateKeyGroupPartitions(int i, int i2) {
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(i, i2);
        for (int i3 = 0; i3 < i; i3++) {
            KeyGroupRange keyGroupRange = (KeyGroupRange) createKeyGroupPartitions.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(i, i2, i3));
            if (!keyGroupRange.contains(i3)) {
                Assert.fail("Could not find expected key-group " + i3 + " in range " + keyGroupRange);
            }
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random random = new Random(42L);
        for (int i = 0; i < 10000; i++) {
            doTestPartitionableStateRepartitioning(random, 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9));
        }
    }

    private void doTestPartitionableStateRepartitioning(Random random, int i, int i2, int i3, int i4) {
        int i5;
        ArrayList<OperatorStateHandle> arrayList = new ArrayList(i);
        for (int i6 = 0; i6 < i; i6++) {
            Path path = new Path("/fake-" + i6);
            HashMap hashMap = new HashMap();
            int i7 = 0;
            for (int i8 = 0; i8 < i3 - 1; i8++) {
                long[] jArr = new long[1 + random.nextInt(i4)];
                for (int i9 = 0; i9 < jArr.length; i9++) {
                    jArr[i9] = i7;
                    i7++;
                }
                hashMap.put("State-" + i8, new OperatorStateHandle.StateMetaInfo(jArr, random.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            }
            if (i3 % 2 == 0) {
                hashMap.put("State-" + (i3 - 1), new OperatorStateHandle.StateMetaInfo(new long[]{i7 + 1, i7 + 2, i7 + 3, i7 + 4}, OperatorStateHandle.Mode.BROADCAST));
            }
            arrayList.add(new OperatorStreamStateHandle(hashMap, new FileStateHandle(path, -1L)));
        }
        HashMap hashMap2 = new HashMap();
        int i10 = 0;
        int i11 = 0;
        for (OperatorStateHandle operatorStateHandle : arrayList) {
            Map stateNameToPartitionOffsets = operatorStateHandle.getStateNameToPartitionOffsets();
            HashMap hashMap3 = new HashMap(stateNameToPartitionOffsets.size());
            for (Map.Entry entry : stateNameToPartitionOffsets.entrySet()) {
                long[] offsets = ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets();
                switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode().ordinal()]) {
                    case 1:
                        i5 = i2;
                        break;
                    case 2:
                        i5 = (i2 / i) + (i10 < i2 % i ? 1 : 0);
                        break;
                    case 3:
                        i5 = 1;
                        break;
                    default:
                        throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode());
                }
                if (i5 > 0) {
                    i11 += i5 * offsets.length;
                    ArrayList arrayList2 = new ArrayList(offsets.length);
                    for (long j : offsets) {
                        for (int i12 = 0; i12 < i5; i12++) {
                            arrayList2.add(Long.valueOf(j));
                        }
                    }
                    hashMap3.put(entry.getKey(), arrayList2);
                }
            }
            if (!hashMap3.isEmpty()) {
                hashMap2.put(operatorStateHandle.getDelegateStateHandle(), hashMap3);
            }
            i10++;
        }
        List repartitionState = RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(arrayList, i2);
        HashMap hashMap4 = new HashMap();
        int i13 = Integer.MAX_VALUE;
        int i14 = 0;
        int i15 = 0;
        for (int i16 = 0; i16 < i2; i16++) {
            int i17 = 0;
            for (OperatorStateHandle operatorStateHandle2 : (Collection) repartitionState.get(i16)) {
                for (Map.Entry entry2 : operatorStateHandle2.getStateNameToPartitionOffsets().entrySet()) {
                    Map map = (Map) hashMap4.get(operatorStateHandle2.getDelegateStateHandle());
                    if (map == null) {
                        map = new HashMap();
                        hashMap4.put(operatorStateHandle2.getDelegateStateHandle(), map);
                    }
                    List list = (List) map.get(entry2.getKey());
                    if (list == null) {
                        list = new ArrayList();
                        map.put(entry2.getKey(), list);
                    }
                    for (long j2 : ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets()) {
                        list.add(Long.valueOf(j2));
                    }
                    i17 += ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets().length;
                }
            }
            i13 = Math.min(i13, i17);
            i14 = Math.max(i14, i17);
            i15 += i17;
        }
        Iterator it = hashMap4.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) it.next()).values().iterator();
            while (it2.hasNext()) {
                Collections.sort((List) it2.next());
            }
        }
        int i18 = i14 - i13;
        Assert.assertTrue("Difference in partition load is > 1 : " + i18, i18 <= 1);
        Assert.assertEquals(i11, i15);
        Assert.assertEquals(hashMap2, hashMap4);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() {
        long currentTimeMillis = System.currentTimeMillis();
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(new ExecutionAttemptID());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
        Mockito.when(checkpointStatsTracker.reportPendingCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointProperties) Matchers.any(CheckpointProperties.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false));
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportPendingCheckpoint(Mockito.eq(1L), Mockito.eq(currentTimeMillis), (CheckpointProperties) Mockito.eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(new ExecutionAttemptID());
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), standaloneCompletedCheckpointStore, new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        standaloneCompletedCheckpointStore.addCheckpoint(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation()));
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
        Assert.assertTrue(checkpointCoordinator.restoreLatestCheckpointedState(Collections.emptyMap(), false, true));
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportRestoredCheckpoint((RestoredCheckpointStats) Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        JobID jobID = new JobID();
        long currentTimeMillis = System.currentTimeMillis();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobVertexID, 2, 4);
        ArrayList arrayList = new ArrayList(2);
        arrayList.addAll(Arrays.asList(mockExecutionJobVertex.getTaskVertices()));
        ExecutionVertex[] executionVertexArr = (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
        RecoverableCompletedCheckpointStore recoverableCompletedCheckpointStore = new RecoverableCompletedCheckpointStore(10);
        ArrayList arrayList2 = new ArrayList(2);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, executionVertexArr, executionVertexArr, executionVertexArr, new StandaloneCheckpointIDCounter(), recoverableCompletedCheckpointStore, new MemoryStateBackend(), Executors.directExecutor(), executor -> {
            SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(executor);
            arrayList2.add(sharedStateRegistry);
            return sharedStateRegistry;
        });
        List<KeyGroupRange> createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(4, 2);
        for (int i = 0; i < 3; i++) {
            performIncrementalCheckpoint(jobID, checkpointCoordinator, mockExecutionJobVertex, createKeyGroupPartitions, currentTimeMillis + i, i);
        }
        List successfulCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
        Assert.assertEquals(3L, successfulCheckpoints.size());
        int i2 = 0;
        ArrayList arrayList3 = new ArrayList(3);
        for (int i3 = 0; i3 < 3; i3++) {
            arrayList3.add(new HashMap(2));
        }
        int i4 = 0;
        Iterator it = successfulCheckpoints.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((CompletedCheckpoint) it.next()).getOperatorStates().values().iterator();
            while (it2.hasNext()) {
                for (OperatorSubtaskState operatorSubtaskState : ((OperatorState) it2.next()).getStates()) {
                    Iterator it3 = operatorSubtaskState.getManagedKeyedState().iterator();
                    while (it3.hasNext()) {
                        IncrementalKeyedStateHandle incrementalKeyedStateHandle = (KeyedStateHandle) it3.next();
                        ((KeyedStateHandle) Mockito.verify(incrementalKeyedStateHandle, Mockito.times(1))).registerSharedStates((SharedStateRegistry) arrayList2.get(0));
                        IncrementalKeyedStateHandle incrementalKeyedStateHandle2 = incrementalKeyedStateHandle;
                        ((Map) arrayList3.get(i4)).putAll(incrementalKeyedStateHandle2.getSharedState());
                        for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle2.getSharedState().values()) {
                            Assert.assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle));
                            ((StreamStateHandle) Mockito.verify(streamStateHandle, Mockito.never())).discardState();
                            i2++;
                        }
                        Iterator it4 = incrementalKeyedStateHandle2.getPrivateState().values().iterator();
                        while (it4.hasNext()) {
                            ((StreamStateHandle) Mockito.verify((StreamStateHandle) it4.next(), Mockito.never())).discardState();
                        }
                        ((StreamStateHandle) Mockito.verify(incrementalKeyedStateHandle2.getMetaStateHandle(), Mockito.never())).discardState();
                    }
                    ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
                }
            }
            i4++;
        }
        Assert.assertEquals(10L, i2);
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it5 = arrayList3.iterator();
        while (it5.hasNext()) {
            Iterator it6 = ((Map) it5.next()).values().iterator();
            while (it6.hasNext()) {
                ((StreamStateHandle) Mockito.verify((StreamStateHandle) it6.next(), Mockito.never())).discardState();
            }
        }
        recoverableCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED);
        HashMap hashMap = new HashMap();
        hashMap.put(jobVertexID, mockExecutionJobVertex);
        checkpointCoordinator.restoreLatestCheckpointedState(hashMap, true, false);
        int i5 = 0;
        Iterator it7 = successfulCheckpoints.iterator();
        while (it7.hasNext()) {
            Iterator it8 = ((CompletedCheckpoint) it7.next()).getOperatorStates().values().iterator();
            while (it8.hasNext()) {
                Iterator it9 = ((OperatorState) it8.next()).getStates().iterator();
                while (it9.hasNext()) {
                    Iterator it10 = ((OperatorSubtaskState) it9.next()).getManagedKeyedState().iterator();
                    while (it10.hasNext()) {
                        ((KeyedStateHandle) Mockito.verify((KeyedStateHandle) it10.next(), i5 > 0 ? Mockito.times(1) : Mockito.never())).registerSharedStates((SharedStateRegistry) arrayList2.get(1));
                    }
                }
            }
            i5++;
        }
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it11 = arrayList3.iterator();
        while (it11.hasNext()) {
            for (Map.Entry entry : ((Map) it11.next()).entrySet()) {
                String keyString = ((StateHandleID) entry.getKey()).getKeyString();
                if (Integer.parseInt(String.valueOf(keyString.charAt(keyString.length() - 1))) == 0) {
                    ((StreamStateHandle) Mockito.verify(entry.getValue(), Mockito.times(1))).discardState();
                } else {
                    ((StreamStateHandle) Mockito.verify(entry.getValue(), Mockito.never())).discardState();
                }
            }
        }
        recoverableCompletedCheckpointStore.removeOldestCheckpoint();
        Iterator it12 = arrayList3.iterator();
        while (it12.hasNext()) {
            Iterator it13 = ((Map) it12.next()).values().iterator();
            while (it13.hasNext()) {
                ((StreamStateHandle) Mockito.verify((StreamStateHandle) it13.next(), Mockito.times(1))).discardState();
            }
        }
    }

    private void performIncrementalCheckpoint(JobID jobID, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> list, long j, int i) throws Exception {
        checkpointCoordinator.triggerCheckpoint(j, false);
        Assert.assertTrue(checkpointCoordinator.getPendingCheckpoints().keySet().size() == 1);
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        for (int i2 = 0; i2 < executionJobVertex.getParallelism(); i2++) {
            KeyGroupRange keyGroupRange = list.get(i2);
            HashMap hashMap = new HashMap();
            hashMap.put(new StateHandleID("private-1"), Mockito.spy(new ByteStreamStateHandle("private-1", new byte[]{112})));
            HashMap hashMap2 = new HashMap();
            if (i > 0) {
                hashMap2.put(new StateHandleID("shared-" + (i - 1)), Mockito.spy(new PlaceholderStreamStateHandle()));
            }
            hashMap2.put(new StateHandleID("shared-" + i), Mockito.spy(new ByteStreamStateHandle("shared-" + i + "-" + keyGroupRange, new byte[]{115})));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((IncrementalKeyedStateHandle) Mockito.spy(new IncrementalKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, longValue, hashMap2, hashMap, (StreamStateHandle) Mockito.spy(new ByteStreamStateHandle("meta", new byte[]{109}))))), StateObjectCollection.empty()));
            HashMap hashMap3 = new HashMap();
            hashMap3.put(executionJobVertex.getOperatorIDs().get(0), operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionJobVertex.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), new TaskStateSnapshot(hashMap3)));
        }
    }
}
