package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.SerializableObject;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.class */
public class CheckpointCoordinatorRestoringTest extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest$TestScaleType.class */
    public enum TestScaleType {
        INCREASE_PARALLELISM,
        DECREASE_PARALLELISM,
        SAME_PARALLELISM
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testRestoreLatestCheckpointedState() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, 2, 13).build();
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, build2.getPendingCheckpoints().size());
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, 2);
        for (int i = 0; i < jobVertex.getParallelism(); i++) {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertexID, i, (KeyGroupRange) createKeyGroupPartitions.get(i))), TASK_MANAGER_LOCATION_INFO);
        }
        for (int i2 = 0; i2 < jobVertex2.getParallelism(); i2++) {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex2.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertexID2, i2, (KeyGroupRange) createKeyGroupPartitions2.get(i2))), TASK_MANAGER_LOCATION_INFO);
        }
        List successfulCheckpoints = build2.getSuccessfulCheckpoints();
        Assert.assertEquals(1L, successfulCheckpoints.size());
        embeddedCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex);
        hashSet.add(jobVertex2);
        Assert.assertTrue(build2.restoreLatestCheckpointedStateToAll(hashSet, false));
        Iterator it = successfulCheckpoints.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((CompletedCheckpoint) it.next()).getOperatorStates().values().iterator();
            while (it2.hasNext()) {
                Iterator it3 = ((OperatorState) it2.next()).getStates().iterator();
                while (it3.hasNext()) {
                    ((OperatorSubtaskState) Mockito.verify((OperatorSubtaskState) it3.next(), Mockito.times(2))).registerSharedStates((SharedStateRegistry) ArgumentMatchers.any(SharedStateRegistry.class));
                }
            }
        }
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex, createKeyGroupPartitions);
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID2, jobVertex2, createKeyGroupPartitions2);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    @Test
    public void testRestoreLatestCheckpointWhenPreferCheckpoint() throws Exception {
        testRestoreLatestCheckpointIsPreferSavepoint(true);
    }

    @Test
    public void testRestoreLatestCheckpointWhenPreferSavepoint() throws Exception {
        testRestoreLatestCheckpointIsPreferSavepoint(false);
    }

    private void testRestoreLatestCheckpointIsPreferSavepoint(boolean z) {
        try {
            CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build();
            ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
            ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
            ExecutionVertex executionVertex = jobVertex.getTaskVertices()[0];
            ExecutionVertex executionVertex2 = jobVertex2.getTaskVertices()[0];
            Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
            Execution currentExecutionAttempt2 = executionVertex2.getCurrentExecutionAttempt();
            HashSet hashSet = new HashSet();
            hashSet.add(jobVertex);
            hashSet.add(jobVertex2);
            CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setPreferCheckpointForRecovery(z).build()).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCompletedCheckpointStore(new EmbeddedCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
            long last = standaloneCheckpointIDCounter.getLast();
            KeyGroupsStateHandle generateKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(KeyGroupRange.of(0, 0), Collections.singletonList(new SerializableObject()));
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), OperatorSubtaskState.builder().setManagedKeyedState(generateKeyGroupState).build());
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), currentExecutionAttempt.getAttemptId(), last, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), currentExecutionAttempt2.getAttemptId(), last), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(build.getJobID(), ((CompletedCheckpoint) build2.getSuccessfulCheckpoints().get(0)).getJobId());
            CompletableFuture triggerSavepoint = build2.triggerSavepoint(this.tmpFolder.newFolder().getAbsolutePath());
            KeyGroupsStateHandle generateKeyGroupState2 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(KeyGroupRange.of(1, 1), Collections.singletonList(new SerializableObject()));
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), OperatorSubtaskState.builder().setManagedOperatorState(StateObjectCollection.empty()).setRawOperatorState(StateObjectCollection.empty()).setManagedKeyedState(StateObjectCollection.singleton(generateKeyGroupState2)).setRawKeyedState(StateObjectCollection.empty()).build());
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long last2 = standaloneCheckpointIDCounter.getLast();
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), currentExecutionAttempt.getAttemptId(), last2, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), currentExecutionAttempt2.getAttemptId(), last2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertNotNull(triggerSavepoint.get());
            Assert.assertTrue(build2.restoreLatestCheckpointedStateToAll(hashSet, false));
            Assert.assertNotNull("Stateful vertex should get state to restore", currentExecutionAttempt.getTaskRestore());
            if (z) {
                Assert.assertEquals(taskStateSnapshot, currentExecutionAttempt.getTaskRestore().getTaskStateSnapshot());
            } else {
                Assert.assertEquals(taskStateSnapshot2, currentExecutionAttempt.getTaskRestore().getTaskStateSnapshot());
            }
            Assert.assertNull("Stateless vertex should not get state to restore", currentExecutionAttempt2.getTaskRestore());
            build2.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean z) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int i = z ? 2 : 13;
        int i2 = z ? 13 : 2;
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, i, 13).build();
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, build2.getPendingCheckpoints().size());
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, i);
        for (int i3 = 0; i3 < jobVertex.getParallelism(); i3++) {
            OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, i3, 2, 8, false)).setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), false)).setRawKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), true)).build();
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), build3);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i3].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        }
        ArrayList arrayList = new ArrayList(jobVertex2.getParallelism());
        ArrayList arrayList2 = new ArrayList(jobVertex2.getParallelism());
        for (int i4 = 0; i4 < jobVertex2.getParallelism(); i4++) {
            KeyGroupsStateHandle generateKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), false);
            KeyGroupsStateHandle generateKeyGroupState2 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), true);
            OperatorStateHandle generatePartitionableStateHandle = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, true);
            arrayList.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle)));
            arrayList2.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle2)));
            OperatorSubtaskState build4 = OperatorSubtaskState.builder().setManagedOperatorState(generatePartitionableStateHandle).setRawOperatorState(generatePartitionableStateHandle2).setManagedKeyedState(generateKeyGroupState).setRawKeyedState(generateKeyGroupState2).build();
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), build4);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex2.getTaskVertices()[i4].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        }
        Assert.assertEquals(1L, build2.getSuccessfulCheckpoints().size());
        List createKeyGroupPartitions3 = StateAssignmentOperation.createKeyGroupPartitions(13, i2);
        ExecutionGraph build5 = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, i2, 13).build();
        ExecutionJobVertex jobVertex3 = build5.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex4 = build5.getJobVertex(jobVertexID2);
        CheckpointCoordinator build6 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build5).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex3);
        hashSet.add(jobVertex4);
        Assert.assertTrue(build6.restoreLatestCheckpointedStateToAll(hashSet, false));
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex3, createKeyGroupPartitions);
        ArrayList arrayList3 = new ArrayList(jobVertex4.getParallelism());
        ArrayList arrayList4 = new ArrayList(jobVertex4.getParallelism());
        for (int i5 = 0; i5 < jobVertex4.getParallelism(); i5++) {
            List operatorIDs = jobVertex4.getOperatorIDs();
            KeyGroupsStateHandle generateKeyGroupState3 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), false);
            KeyGroupsStateHandle generateKeyGroupState4 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), true);
            JobManagerTaskRestore taskRestore = jobVertex4.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot3 = taskRestore.getTaskStateSnapshot();
            int size = operatorIDs.size() - 1;
            ArrayList arrayList5 = new ArrayList(operatorIDs.size());
            ArrayList arrayList6 = new ArrayList(operatorIDs.size());
            for (int i6 = 0; i6 < operatorIDs.size(); i6++) {
                OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot3.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(i6)).getGeneratedOperatorID());
                StateObjectCollection managedOperatorState = subtaskStateByOperatorID.getManagedOperatorState();
                StateObjectCollection rawOperatorState = subtaskStateByOperatorID.getRawOperatorState();
                arrayList5.add(managedOperatorState);
                arrayList6.add(rawOperatorState);
                if (i6 == size) {
                    StateObjectCollection managedKeyedState = subtaskStateByOperatorID.getManagedKeyedState();
                    StateObjectCollection rawKeyedState = subtaskStateByOperatorID.getRawKeyedState();
                    CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState3), managedKeyedState);
                    CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState4), rawKeyedState);
                }
            }
            arrayList3.add(arrayList5);
            arrayList4.add(arrayList6);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState(arrayList, arrayList3);
        CheckpointCoordinatorTestingUtils.comparePartitionableState(arrayList2, arrayList4);
    }

    @Test(expected = IllegalStateException.class)
    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, 2, 13).build();
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, build2.getPendingCheckpoints().size());
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, 2);
        for (int i = 0; i < jobVertex.getParallelism(); i++) {
            OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i), false)).build();
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), build3);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        }
        for (int i2 = 0; i2 < jobVertex2.getParallelism(); i2++) {
            OperatorSubtaskState build4 = OperatorSubtaskState.builder().setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i2), false)).build();
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), build4);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex2.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        }
        Assert.assertEquals(1L, build2.getSuccessfulCheckpoints().size());
        ExecutionGraph build5 = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 20).addJobVertex(jobVertexID2, 2, 42).build();
        ExecutionJobVertex jobVertex3 = build5.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex4 = build5.getJobVertex(jobVertexID2);
        CheckpointCoordinator build6 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build5).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex3);
        hashSet.add(jobVertex4);
        Assert.assertTrue(build6.restoreLatestCheckpointedStateToAll(hashSet, false));
        Assert.fail("The restoration should have failed because the max parallelism changed.");
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChange() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        return new Tuple2<>(jobVertexID, OperatorID.fromJobVertexID(jobVertexID));
    }

    public void testStateRecoveryWithTopologyChange(TestScaleType testScaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> generateIDPair = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair2 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair3 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair4 = generateIDPair();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(64, 10);
        HashMap hashMap = new HashMap();
        for (Tuple2 tuple2 : Arrays.asList(generateIDPair, generateIDPair2)) {
            OperatorState operatorState = new OperatorState((OperatorID) tuple2.f1, 10, 64);
            hashMap.put(tuple2.f1, operatorState);
            for (int i = 0; i < operatorState.getParallelism(); i++) {
                operatorState.putState(i, OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) tuple2.f0, i, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) tuple2.f0, i, 2, 8, true)).build());
            }
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Tuple2 tuple22 : Arrays.asList(generateIDPair3, generateIDPair4)) {
            OperatorState operatorState2 = new OperatorState((OperatorID) tuple22.f1, 10, 64);
            hashMap.put(tuple22.f1, operatorState2);
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            arrayList.add(arrayList3);
            arrayList2.add(arrayList4);
            for (int i2 = 0; i2 < operatorState2.getParallelism(); i2++) {
                OperatorSubtaskState.Builder builder = OperatorSubtaskState.builder();
                OperatorStateHandle operatorStateHandle = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i2, 2, 8, false).get(0);
                OperatorStateHandle operatorStateHandle2 = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i2, 2, 8, true).get(0);
                if (((JobVertexID) tuple22.f0).equals(generateIDPair3.f0)) {
                    builder.setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i2), false));
                }
                if (((JobVertexID) tuple22.f0).equals(generateIDPair3.f0)) {
                    builder.setRawKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i2), true));
                }
                arrayList3.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle));
                arrayList4.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle2));
                operatorState2.putState(i2, builder.setManagedOperatorState(operatorStateHandle).setRawOperatorState(operatorStateHandle2).build());
            }
        }
        Tuple2<JobVertexID, OperatorID> generateIDPair5 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair6 = generateIDPair();
        int i3 = 10;
        if (testScaleType == TestScaleType.INCREASE_PARALLELISM) {
            i3 = 20;
        } else if (testScaleType == TestScaleType.DECREASE_PARALLELISM) {
            i3 = 8;
        }
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(64, i3);
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex((JobVertexID) generateIDPair5.f0, 10, 64, (List) Stream.of((Object[]) new OperatorID[]{(OperatorID) generateIDPair2.f1, (OperatorID) generateIDPair.f1, (OperatorID) generateIDPair5.f1}).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).addJobVertex((JobVertexID) generateIDPair3.f0, i3, 64, (List) Stream.of((Object[]) new OperatorID[]{(OperatorID) generateIDPair6.f1, (OperatorID) generateIDPair3.f1}).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).build();
        ExecutionJobVertex jobVertex = build.getJobVertex((JobVertexID) generateIDPair5.f0);
        ExecutionJobVertex jobVertex2 = build.getJobVertex((JobVertexID) generateIDPair3.f0);
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex);
        hashSet.add(jobVertex2);
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCompletedCheckpointStore(CompletedCheckpointStore.storeFor(() -> {
        }, new CompletedCheckpoint[]{new CompletedCheckpoint(build.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000, hashMap, Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation())})).setTimer(this.manuallyTriggeredScheduledExecutor).build().restoreLatestCheckpointedStateToAll(hashSet, true);
        for (int i4 = 0; i4 < jobVertex.getParallelism(); i4++) {
            List operatorIDs = jobVertex.getOperatorIDs();
            JobManagerTaskRestore taskRestore = jobVertex.getTaskVertices()[i4].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            Assert.assertTrue(subtaskStateByOperatorID.getManagedKeyedState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID.getRawKeyedState().isEmpty());
            OperatorSubtaskState subtaskStateByOperatorID2 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(2)).getGeneratedOperatorID());
            Assert.assertTrue(subtaskStateByOperatorID2.getManagedOperatorState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID2.getRawOperatorState().isEmpty());
            OperatorSubtaskState subtaskStateByOperatorID3 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(1)).getGeneratedOperatorID());
            OperatorStateHandle generatePartitionableStateHandle = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i4, 2, 8, true);
            StateObjectCollection managedOperatorState = subtaskStateByOperatorID3.getManagedOperatorState();
            Assert.assertEquals(1L, managedOperatorState.size());
            Assert.assertTrue(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle.openInputStream(), ((OperatorStateHandle) managedOperatorState.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState = subtaskStateByOperatorID3.getRawOperatorState();
            Assert.assertEquals(1L, rawOperatorState.size());
            Assert.assertTrue(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle2.openInputStream(), ((OperatorStateHandle) rawOperatorState.iterator().next()).openInputStream()));
            OperatorSubtaskState subtaskStateByOperatorID4 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(0)).getGeneratedOperatorID());
            OperatorStateHandle generatePartitionableStateHandle3 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle4 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i4, 2, 8, true);
            StateObjectCollection managedOperatorState2 = subtaskStateByOperatorID4.getManagedOperatorState();
            Assert.assertEquals(1L, managedOperatorState2.size());
            Assert.assertTrue(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle3.openInputStream(), ((OperatorStateHandle) managedOperatorState2.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState2 = subtaskStateByOperatorID4.getRawOperatorState();
            Assert.assertEquals(1L, rawOperatorState2.size());
            Assert.assertTrue(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle4.openInputStream(), ((OperatorStateHandle) rawOperatorState2.iterator().next()).openInputStream()));
        }
        ArrayList arrayList5 = new ArrayList(jobVertex2.getParallelism());
        ArrayList arrayList6 = new ArrayList(jobVertex2.getParallelism());
        for (int i5 = 0; i5 < jobVertex2.getParallelism(); i5++) {
            List operatorIDs2 = jobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore2 = jobVertex2.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals(2L, taskRestore2.getRestoreCheckpointId());
            TaskStateSnapshot taskStateSnapshot2 = taskRestore2.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID5 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(1)).getGeneratedOperatorID());
            ArrayList arrayList7 = new ArrayList(1);
            arrayList7.add(subtaskStateByOperatorID5.getManagedOperatorState());
            ArrayList arrayList8 = new ArrayList(1);
            arrayList8.add(subtaskStateByOperatorID5.getRawOperatorState());
            arrayList5.add(arrayList7);
            arrayList6.add(arrayList8);
            OperatorSubtaskState subtaskStateByOperatorID6 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(0)).getGeneratedOperatorID());
            Assert.assertTrue(subtaskStateByOperatorID6.getManagedOperatorState().isEmpty());
            Assert.assertTrue(subtaskStateByOperatorID6.getRawOperatorState().isEmpty());
            KeyGroupsStateHandle generateKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i5), false);
            KeyGroupsStateHandle generateKeyGroupState2 = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i5), true);
            OperatorSubtaskState subtaskStateByOperatorID7 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(operatorIDs2.size() - 1)).getGeneratedOperatorID());
            StateObjectCollection managedKeyedState = subtaskStateByOperatorID7.getManagedKeyedState();
            StateObjectCollection rawKeyedState = subtaskStateByOperatorID7.getRawKeyedState();
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState), managedKeyedState);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState2), rawKeyedState);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List) arrayList.get(0), arrayList5);
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List) arrayList2.get(0), arrayList6);
    }
}
