/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.VertexFinishedStateChecker;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorRestoringTest
extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    private static void acknowledgeCheckpoint(CheckpointCoordinator coordinator, ExecutionGraph executionGraph, ExecutionJobVertex jobVertex, long checkpointId) throws Exception {
        List partitions = StateAssignmentOperation.createKeyGroupPartitions((int)jobVertex.getMaxParallelism(), (int)jobVertex.getParallelism());
        for (int partitionIdx = 0; partitionIdx < partitions.size(); ++partitionIdx) {
            TaskStateSnapshot subtaskState = CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertex.getJobVertexId(), partitionIdx, (KeyGroupRange)partitions.get(partitionIdx));
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(executionGraph.getJobID(), jobVertex.getTaskVertices()[partitionIdx].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static ExecutionGraph createExecutionGraph(List<TestingVertex> vertices) throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder builder = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder();
        for (TestingVertex vertex : vertices) {
            builder.addJobVertex(vertex.getId(), vertex.getParallelism(), vertex.getMaxParallelism());
        }
        return builder.build();
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testRestoreLatestCheckpointedState() throws Exception {
        List<TestingVertex> vertices = Arrays.asList(new TestingVertex(new JobVertexID(), 3, 42), new TestingVertex(new JobVertexID(), 2, 13));
        this.testRestoreLatestCheckpointedState(vertices, this.testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(vertices));
    }

    private Collection<CompletedCheckpoint> testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(List<TestingVertex> vertices) throws Exception {
        ExecutionGraph executionGraph = CheckpointCoordinatorRestoringTest.createExecutionGraph(vertices);
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore();
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(executionGraph).setTimer(this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore((CompletedCheckpointStore)store).build();
        coordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coordinator.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
        for (TestingVertex vertex : vertices) {
            ExecutionJobVertex executionVertex = Objects.requireNonNull(executionGraph.getJobVertex(vertex.getId()));
            CheckpointCoordinatorRestoringTest.acknowledgeCheckpoint(coordinator, executionGraph, executionVertex, checkpointId);
        }
        List completedCheckpoints = coordinator.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        return store.getAllCheckpoints();
    }

    private void testRestoreLatestCheckpointedState(List<TestingVertex> vertices, Collection<CompletedCheckpoint> completedCheckpoints) throws Exception {
        ExecutionGraph executionGraph = CheckpointCoordinatorRestoringTest.createExecutionGraph(vertices);
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(completedCheckpoints.size(), completedCheckpoints);
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(executionGraph).setTimer(this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore((CompletedCheckpointStore)store).build();
        Set executionVertices = vertices.stream().map(TestingVertex::getId).map(arg_0 -> ((ExecutionGraph)executionGraph).getJobVertex(arg_0)).collect(Collectors.toSet());
        Assert.assertTrue((boolean)coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false));
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class));
                }
            }
        }
        for (ExecutionJobVertex executionVertex : executionVertices) {
            CheckpointCoordinatorTestingUtils.verifyStateRestore(executionVertex);
        }
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = scaleOut ? 2 : 13;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        int newParallelism2 = scaleOut ? 13 : 2;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, parallelism2, maxParallelism2).build();
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false);
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), true);
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(opStateBackend).setManagedKeyedState((KeyedStateHandle)keyedStateBackend).setRawKeyedState((KeyedStateHandle)keyedStateRaw).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(3, new Random()))).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesBackend = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesRaw = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        for (int index = 0; index < jobVertex2.getParallelism(); ++index) {
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), true);
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
            OperatorStateHandle opStateRaw = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
            expectedOpStatesBackend.add(new ChainedStateHandle(Collections.singletonList(opStateBackend)));
            expectedOpStatesRaw.add(new ChainedStateHandle(Collections.singletonList(opStateRaw)));
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(opStateBackend).setRawOperatorState(opStateRaw).setManagedKeyedState((KeyedStateHandle)keyedStateBackend).setRawKeyedState((KeyedStateHandle)keyedStateRaw).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, newParallelism2, maxParallelism2).build();
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex(jobVertexID1);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex(jobVertexID2);
        CheckpointCoordinator newCoord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(newGraph).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        Assert.assertTrue((boolean)newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesBackend = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)1L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
            int headOpIndex = operatorIDs.size() - 1;
            ArrayList<StateObjectCollection> allParallelManagedOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            ArrayList<StateObjectCollection> allParallelRawOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            for (int idx = 0; idx < operatorIDs.size(); ++idx) {
                OperatorID operatorID = ((OperatorIDPair)operatorIDs.get(idx)).getGeneratedOperatorID();
                OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
                StateObjectCollection opStateBackend = opState.getManagedOperatorState();
                StateObjectCollection opStateRaw = opState.getRawOperatorState();
                allParallelManagedOpStates.add(opStateBackend);
                allParallelRawOpStates.add(opStateRaw);
                if (idx != headOpIndex) continue;
                StateObjectCollection keyedStateBackend = opState.getManagedKeyedState();
                StateObjectCollection keyGroupStateRaw = opState.getRawKeyedState();
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
            }
            actualOpStatesBackend.add(allParallelManagedOpStates);
            actualOpStatesRaw.add(allParallelRawOpStates);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend);
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
    }

    @Test(expected=IllegalStateException.class)
    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot taskOperatorSubtaskStates;
        OperatorSubtaskState operatorSubtaskState;
        KeyGroupsStateHandle keyGroupState;
        int index;
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, parallelism2, maxParallelism2).build();
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            operatorSubtaskState = OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyGroupState).build();
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            operatorSubtaskState = OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyGroupState).build();
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        int newMaxParallelism1 = 20;
        int newMaxParallelism2 = 42;
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, newMaxParallelism1).addJobVertex(jobVertexID2, parallelism2, newMaxParallelism2).build();
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex(jobVertexID1);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex(jobVertexID2);
        CheckpointCoordinator newCoord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(newGraph).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        Assert.assertTrue((boolean)newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
        Assert.fail((String)"The restoration should have failed because the max parallelism changed.");
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChange() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = OperatorID.fromJobVertexID((JobVertexID)jobVertexID);
        return new Tuple2((Object)jobVertexID, (Object)operatorID);
    }

    public void testStateRecoveryWithTopologyChange(TestScaleType scaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> id1 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id2 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism1 = 10;
        int maxParallelism1 = 64;
        Tuple2<JobVertexID, OperatorID> id3 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id4 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism2 = 10;
        int maxParallelism2 = 64;
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        HashMap<Object, OperatorState> operatorStates = new HashMap<Object, OperatorState>();
        for (Tuple2 id : Arrays.asList(id1, id2)) {
            OperatorState taskState = new OperatorState((OperatorID)id.f1, parallelism1, maxParallelism1);
            operatorStates.put(id.f1, taskState);
            for (int index = 0; index < taskState.getParallelism(); ++index) {
                OperatorSubtaskState subtaskState = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true)).build();
                taskState.putState(index, subtaskState);
            }
        }
        ArrayList expectedManagedOperatorStates = new ArrayList();
        ArrayList expectedRawOperatorStates = new ArrayList();
        for (Tuple2 id : Arrays.asList(id3, id4)) {
            OperatorState operatorState = new OperatorState((OperatorID)id.f1, parallelism2, maxParallelism2);
            operatorStates.put(id.f1, operatorState);
            ArrayList<ChainedStateHandle> expectedManagedOperatorState = new ArrayList<ChainedStateHandle>();
            ArrayList<ChainedStateHandle> expectedRawOperatorState = new ArrayList<ChainedStateHandle>();
            expectedManagedOperatorStates.add(expectedManagedOperatorState);
            expectedRawOperatorStates.add(expectedRawOperatorState);
            for (int index = 0; index < operatorState.getParallelism(); ++index) {
                OperatorSubtaskState.Builder stateBuilder = OperatorSubtaskState.builder();
                OperatorStateHandle subManagedOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false).get(0);
                OperatorStateHandle subRawOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true).get(0);
                if (((JobVertexID)id.f0).equals(id3.f0)) {
                    stateBuilder.setManagedKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), false));
                }
                if (((JobVertexID)id.f0).equals(id3.f0)) {
                    stateBuilder.setRawKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), true));
                }
                expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subManagedOperatorState));
                expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subRawOperatorState));
                OperatorSubtaskState subtaskState = stateBuilder.setManagedOperatorState(subManagedOperatorState).setRawOperatorState(subRawOperatorState).build();
                operatorState.putState(index, subtaskState);
            }
        }
        Tuple2<JobVertexID, OperatorID> id5 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism1 = 10;
        Tuple2<JobVertexID, OperatorID> id6 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism2 = parallelism2;
        if (scaleType == TestScaleType.INCREASE_PARALLELISM) {
            newParallelism2 = 20;
        } else if (scaleType == TestScaleType.DECREASE_PARALLELISM) {
            newParallelism2 = 8;
        }
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex((JobVertexID)id5.f0, newParallelism1, maxParallelism1, Stream.of((OperatorID)id2.f1, (OperatorID)id1.f1, (OperatorID)id5.f1).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).addJobVertex((JobVertexID)id3.f0, newParallelism2, maxParallelism2, Stream.of((OperatorID)id6.f1, (OperatorID)id3.f1).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).build();
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex((JobVertexID)id5.f0);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex((JobVertexID)id3.f0);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(newGraph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(newGraph).setCompletedCheckpointStore(CompletedCheckpointStore.storeFor(() -> {}, (CompletedCheckpoint[])new CompletedCheckpoint[]{completedCheckpoint})).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.restoreLatestCheckpointedStateToAll(tasks, true);
        for (int i = 0; i < newJobVertex1.getParallelism(); ++i) {
            List operatorIDs = newJobVertex1.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)2L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)headOpState.getManagedKeyedState().isEmpty());
            Assert.assertTrue((boolean)headOpState.getRawKeyedState().isEmpty());
            int operatorIndexInChain = 2;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            operatorIndexInChain = 1;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            OperatorStateHandle expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, false);
            OperatorStateHandle expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, true);
            StateObjectCollection managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, false);
            expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, true);
            managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
        }
        ArrayList<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)2L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            int operatorIndexInChain = 1;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            ArrayList<StateObjectCollection> actualSubManagedOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubManagedOperatorState.add(opState.getManagedOperatorState());
            ArrayList<StateObjectCollection> actualSubRawOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubRawOperatorState.add(opState.getRawOperatorState());
            actualManagedOperatorStates.add(actualSubManagedOperatorState);
            actualRawOperatorStates.add(actualSubRawOperatorState);
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            StateObjectCollection keyedStateBackend = headOpState.getManagedKeyedState();
            StateObjectCollection keyGroupStateRaw = headOpState.getRawKeyedState();
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedRawOperatorStates.get(0), actualRawOperatorStates);
    }

    @Test
    public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        int parallelism1 = 3;
        int maxParallelism1 = 42;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, parallelism1, maxParallelism1).build();
        ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(1L).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        Random random = new Random();
        for (int index = 0; index < jobVertex.getParallelism(); ++index) {
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, true)).setManagedKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange)keyGroupPartitions1.get(index), false)).setRawKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange)keyGroupPartitions1.get(index), true)).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(3, random))).setResultSubpartitionState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewResultSubpartitionStateHandle(3, random))).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        Assert.assertEquals((long)1L, (long)coord.getSuccessfulCheckpoints().size());
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(jobVertex);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex, keyGroupPartitions1);
        for (int i = 0; i < jobVertex.getParallelism(); ++i) {
            JobManagerTaskRestore taskRestore = jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)1L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID));
            Assert.assertTrue((boolean)operatorState.getInputChannelState().isEmpty());
            Assert.assertTrue((boolean)operatorState.getResultSubpartitionState().isEmpty());
            Assert.assertFalse((boolean)operatorState.getRawOperatorState().isEmpty());
            Assert.assertFalse((boolean)operatorState.getManagedOperatorState().isEmpty());
            Assert.assertFalse((boolean)operatorState.getRawKeyedState().isEmpty());
            Assert.assertFalse((boolean)operatorState.getManagedOperatorState().isEmpty());
        }
    }

    @Test
    public void testRestoreFinishedStateWithoutInFlightData() throws Exception {
        OperatorIDPair op1 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1, Collections.singletonList(op1), true).build();
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        HashMap<OperatorID, FullyFinishedOperatorState> operatorStates = new HashMap<OperatorID, FullyFinishedOperatorState>();
        operatorStates.put(op1.getGeneratedOperatorID(), new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 1, 1));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(graph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
        completedCheckpointStore.addCheckpoint(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(2L).build()).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).build();
        ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
        coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
        TaskStateSnapshot restoredState = vertex.getTaskVertices()[0].getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot();
        Assert.assertTrue((boolean)restoredState.isFinishedOnRestore());
    }

    @Test
    public void testRestoringPartiallyFinishedChainsFailsWithoutUidHash() throws Exception {
        this.testRestoringPartiallyFinishedChainsFails(false);
    }

    @Test
    public void testRestoringPartiallyFinishedChainsFailsWithUidHash() throws Exception {
        this.testRestoringPartiallyFinishedChainsFails(true);
    }

    private void testRestoringPartiallyFinishedChainsFails(boolean useUidHash) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        OperatorIDPair op1 = OperatorIDPair.of((OperatorID)new OperatorID(), (OperatorID)new OperatorID());
        OperatorIDPair op2 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        OperatorIDPair op3 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID2, 1, 1, Collections.singletonList(op3), true).addJobVertex(jobVertexID1, 1, 1, Arrays.asList(op1, op2), true).build();
        HashMap<OperatorID, Object> operatorStates = new HashMap<OperatorID, Object>();
        operatorStates.put(useUidHash ? (OperatorID)op1.getUserDefinedOperatorID().get() : op1.getGeneratedOperatorID(), new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 1, 1));
        operatorStates.put(op2.getGeneratedOperatorID(), new OperatorState(op2.getGeneratedOperatorID(), 1, 1));
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore();
        store.addCheckpoint(new CompletedCheckpoint(graph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation()), new CheckpointsCleaner(), () -> {});
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCompletedCheckpointStore((CompletedCheckpointStore)store).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet<ExecutionJobVertex> vertices = new HashSet<ExecutionJobVertex>();
        vertices.add(graph.getJobVertex(jobVertexID1));
        this.thrown.expect(FlinkRuntimeException.class);
        this.thrown.expectMessage("Can not restore vertex anon(" + jobVertexID1 + ") which contain mixed operator finished state: [ALL_RUNNING, FULLY_FINISHED]");
        coord.restoreInitialCheckpointIfPresent(vertices);
    }

    @Test
    public void testAddingRunningOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID2 + ") has a predecessor not fully finished");
    }

    @Test
    public void testAddingPartiallyFinishedOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID2 + ") has a predecessor not fully finished");
    }

    @Test
    public void testAddingAllRunningOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a all running predecessor");
    }

    @Test
    public void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a partially finished predecessor");
    }

    @Test
    public void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithPointwiseAndAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE, DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a partially finished predecessor");
    }

    @Test
    public void testAddingAllRunningOperatorBeforePartiallyFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with all running ones. Task vertex vert2(" + jobVertexID2 + ") has a all running predecessor");
    }

    private void testAddingOperatorsBeforePartiallyOrFullyFinishedOne(JobVertexID firstVertexId, String firstVertexName, VertexFinishedStateChecker.VertexFinishedState firstOperatorFinishedState, JobVertexID secondVertexId, String secondVertexName, VertexFinishedStateChecker.VertexFinishedState secondOperatorFinishedState, DistributionPattern[] distributionPatterns, Class<? extends Throwable> expectedExceptionalClass, String expectedMessage) throws Exception {
        OperatorIDPair op1 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        OperatorIDPair op2 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        JobVertex vertex1 = new JobVertex(firstVertexName, firstVertexId, Collections.singletonList(op1));
        JobVertex vertex2 = new JobVertex(secondVertexName, secondVertexId, Collections.singletonList(op2));
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex2.setInvokableClass(NoOpInvokable.class);
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(vertex1, true).addJobVertex(vertex2, false).setDistributionPattern(distributionPatterns[0]).build();
        for (int i = 1; i < distributionPatterns.length; ++i) {
            vertex2.connectNewDataSetAsInput(vertex1, distributionPatterns[i], ResultPartitionType.PIPELINED);
        }
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        operatorStates.put(op1.getGeneratedOperatorID(), this.createOperatorState(op1.getGeneratedOperatorID(), firstOperatorFinishedState));
        operatorStates.put(op2.getGeneratedOperatorID(), this.createOperatorState(op2.getGeneratedOperatorID(), secondOperatorFinishedState));
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore();
        store.addCheckpoint(new CompletedCheckpoint(graph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation()), new CheckpointsCleaner(), () -> {});
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCompletedCheckpointStore((CompletedCheckpointStore)store).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet<ExecutionJobVertex> vertices = new HashSet<ExecutionJobVertex>();
        vertices.add(graph.getJobVertex(vertex1.getID()));
        vertices.add(graph.getJobVertex(vertex2.getID()));
        this.thrown.expect(expectedExceptionalClass);
        this.thrown.expectMessage(expectedMessage);
        coord.restoreInitialCheckpointIfPresent(vertices);
    }

    private OperatorState createOperatorState(OperatorID operatorId, VertexFinishedStateChecker.VertexFinishedState finishedState) {
        switch (finishedState) {
            case ALL_RUNNING: {
                return new OperatorState(operatorId, 2, 2);
            }
            case PARTIALLY_FINISHED: {
                OperatorState operatorState = new OperatorState(operatorId, 2, 2);
                operatorState.putState(0, (OperatorSubtaskState)FinishedOperatorSubtaskState.INSTANCE);
                return operatorState;
            }
            case FULLY_FINISHED: {
                return new FullyFinishedOperatorState(operatorId, 2, 2);
            }
        }
        throw new UnsupportedOperationException("Not supported finished state: " + finishedState);
    }

    private static class TestingVertex {
        private final JobVertexID id;
        private final int parallelism;
        private final int maxParallelism;

        private TestingVertex(JobVertexID id, int parallelism, int maxParallelism) {
            this.id = id;
            this.parallelism = parallelism;
            this.maxParallelism = maxParallelism;
        }

        public JobVertexID getId() {
            return this.id;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public int getMaxParallelism() {
            return this.maxParallelism;
        }
    }

    private static enum TestScaleType {
        INCREASE_PARALLELISM,
        DECREASE_PARALLELISM,
        SAME_PARALLELISM;

    }
}

