/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.junit.Assert;
import org.mockito.Mockito;

public class CheckpointCoordinatorTestingUtils {
    public static OperatorStateHandle generatePartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(statesListsMap);
    }

    static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return ChainedStateHandle.wrapSingleHandle((StateObject)CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(statesListsMap));
    }

    static OperatorStateHandle generatePartitionableStateHandle(Map<String, List<? extends Serializable>> states) throws IOException {
        ArrayList<List<? extends Serializable>> namedStateSerializables = new ArrayList<List<? extends Serializable>>(states.size());
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            namedStateSerializables.add(entry.getValue());
        }
        Tuple2<byte[], List<long[]>> serializationWithOffsets = CheckpointCoordinatorTestingUtils.serializeTogetherAndTrackOffsets(namedStateSerializables);
        HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(states.size());
        int idx = 0;
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            offsetsMap.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo((long[])((List)serializationWithOffsets.f1).get(idx), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            ++idx;
        }
        return new OperatorStreamStateHandle(offsetsMap, (StreamStateHandle)CheckpointCoordinatorTestingUtils.generateByteStreamStateHandle((byte[])serializationWithOffsets.f0));
    }

    private static ByteStreamStateHandle generateByteStreamStateHandle(byte[] bytes) {
        return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), bytes);
    }

    static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(List<List<? extends Serializable>> serializables) throws IOException {
        ArrayList<long[]> offsets = new ArrayList<long[]>(serializables.size());
        ArrayList<byte[]> serializedGroupValues = new ArrayList<byte[]>();
        int runningGroupsOffset = 0;
        for (List<? extends Serializable> list : serializables) {
            long[] currentOffsets = new long[list.size()];
            offsets.add(currentOffsets);
            for (int i = 0; i < list.size(); ++i) {
                currentOffsets[i] = runningGroupsOffset;
                byte[] serializedValue = InstantiationUtil.serializeObject((Object)list.get(i));
                serializedGroupValues.add(serializedValue);
                runningGroupsOffset += serializedValue.length;
            }
        }
        byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset];
        runningGroupsOffset = 0;
        for (byte[] serializedGroupValue : serializedGroupValues) {
            System.arraycopy(serializedGroupValue, 0, allSerializedValuesConcatenated, runningGroupsOffset, serializedGroupValue.length);
            runningGroupsOffset += serializedGroupValue.length;
        }
        return new Tuple2((Object)allSerializedValuesConcatenated, offsets);
    }

    public static void verifyStateRestore(ExecutionJobVertex executionJobVertex) throws Exception {
        CheckpointCoordinatorTestingUtils.verifyStateRestore(executionJobVertex.getJobVertexId(), executionJobVertex, StateAssignmentOperation.createKeyGroupPartitions((int)executionJobVertex.getMaxParallelism(), (int)executionJobVertex.getParallelism()));
    }

    public static void verifyStateRestore(JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> keyGroupPartitions) throws Exception {
        for (int i = 0; i < executionJobVertex.getParallelism(); ++i) {
            JobManagerTaskRestore taskRestore = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)1L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID));
            ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)((OperatorStateHandle)expectedOpStateBackend.get(0)).openInputStream(), (InputStream)((OperatorStateHandle)operatorState.getManagedOperatorState().iterator().next()).openInputStream()));
            KeyGroupsStateHandle expectPartitionedKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, keyGroupPartitions.get(i), false);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), (Collection<? extends KeyedStateHandle>)operatorState.getManagedKeyedState());
        }
    }

    static void compareKeyedState(Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState, Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
        KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
        int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        int actualTotalKeyGroups = 0;
        for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
            Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
            actualTotalKeyGroups += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        }
        Assert.assertEquals((long)expectedTotalKeyGroups, (long)actualTotalKeyGroups);
        Throwable throwable = null;
        try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream();){
            Iterator iterator = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int groupId = (Integer)iterator.next();
                long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                inputStream.seek(offset);
                int expectedKeyGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)inputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
                    Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
                    KeyGroupsStateHandle oneActualKeyGroupStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                    if (!oneActualKeyGroupStateHandle.getKeyGroupRange().contains(groupId)) continue;
                    long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                    FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream();
                    Throwable throwable2 = null;
                    try {
                        actualInputStream.seek(actualOffset);
                        int actualGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)actualInputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                        Assert.assertEquals((long)expectedKeyGroupState, (long)actualGroupState);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (actualInputStream == null) continue;
                        if (throwable2 != null) {
                            try {
                                actualInputStream.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        actualInputStream.close();
                    }
                }
            }
        }
        catch (Throwable throwable5) {
            Throwable throwable6 = throwable5;
            throw throwable5;
        }
    }

    static void comparePartitionableState(List<ChainedStateHandle<OperatorStateHandle>> expected, List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
        ArrayList<String> expectedResult = new ArrayList<String>();
        for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
            for (int i = 0; i < chainedStateHandle.getLength(); ++i) {
                OperatorStateHandle operatorStateHandle = (OperatorStateHandle)chainedStateHandle.get(i);
                CheckpointCoordinatorTestingUtils.collectResult(i, operatorStateHandle, expectedResult);
            }
        }
        Collections.sort(expectedResult);
        ArrayList<String> actualResult = new ArrayList<String>();
        for (List<Collection<OperatorStateHandle>> collectionList : actual) {
            if (collectionList == null) continue;
            for (int i = 0; i < collectionList.size(); ++i) {
                Collection<OperatorStateHandle> stateHandles = collectionList.get(i);
                Assert.assertNotNull(stateHandles);
                for (OperatorStateHandle operatorStateHandle : stateHandles) {
                    CheckpointCoordinatorTestingUtils.collectResult(i, operatorStateHandle, actualResult);
                }
            }
        }
        Collections.sort(actualResult);
        Assert.assertEquals(expectedResult, actualResult);
    }

    static void collectResult(int opIdx, OperatorStateHandle operatorStateHandle, List<String> resultCollector) throws Exception {
        try (FSDataInputStream in = operatorStateHandle.openInputStream();){
            for (Map.Entry entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                for (long offset : ((OperatorStateHandle.StateMetaInfo)entry.getValue()).getOffsets()) {
                    in.seek(offset);
                    Integer state = (Integer)InstantiationUtil.deserializeObject((InputStream)in, (ClassLoader)Thread.currentThread().getContextClassLoader());
                    resultCollector.add(opIdx + " : " + (String)entry.getKey() + " : " + state);
                }
            }
        }
    }

    static TaskStateSnapshot mockSubtaskState(JobVertexID jobVertexID, int index, KeyGroupRange keyGroupRange) throws IOException {
        OperatorStateHandle partitionableState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
        KeyGroupsStateHandle partitionedKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, keyGroupRange, false);
        TaskStateSnapshot subtaskStates = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState = (OperatorSubtaskState)Mockito.spy((Object)OperatorSubtaskState.builder().setManagedOperatorState(partitionableState).setManagedKeyedState((KeyedStateHandle)partitionedKeyGroupState).build());
        subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID), subtaskState);
        return subtaskStates;
    }

    public static KeyGroupsStateHandle generateKeyGroupState(JobVertexID jobVertexID, KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
        ArrayList<Integer> testStatesLists = new ArrayList<Integer>(keyGroupPartition.getNumberOfKeyGroups());
        Iterator iterator = keyGroupPartition.iterator();
        while (iterator.hasNext()) {
            int keyGroupIndex = (Integer)iterator.next();
            int vertexHash = jobVertexID.hashCode();
            int seed = rawState ? vertexHash * (31 + keyGroupIndex) : vertexHash + keyGroupIndex;
            Random random = new Random(seed);
            int simulatedStateValue = random.nextInt();
            testStatesLists.add(simulatedStateValue);
        }
        return CheckpointCoordinatorTestingUtils.generateKeyGroupState(keyGroupPartition, testStatesLists);
    }

    public static KeyGroupsStateHandle generateKeyGroupState(KeyGroupRange keyGroupRange, List<? extends Serializable> states) throws IOException {
        Preconditions.checkArgument((keyGroupRange.getNumberOfKeyGroups() == states.size() ? 1 : 0) != 0);
        Tuple2<byte[], List<long[]>> serializedDataWithOffsets = CheckpointCoordinatorTestingUtils.serializeTogetherAndTrackOffsets(Collections.singletonList(states));
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, (long[])((List)serializedDataWithOffsets.f1).get(0));
        ByteStreamStateHandle allSerializedStatesHandle = CheckpointCoordinatorTestingUtils.generateByteStreamStateHandle((byte[])serializedDataWithOffsets.f0);
        return new KeyGroupsStateHandle(keyGroupRangeOffsets, (StreamStateHandle)allSerializedStatesHandle);
    }

    public static TaskStateSnapshot createSnapshotWithUnionListState(File stateFile, OperatorID operatorId, boolean isOperatorsFinished) throws IOException {
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(1, isOperatorsFinished);
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorId, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(stateFile));
        return taskStateSnapshot;
    }

    public static OperatorSubtaskState createSubtaskStateWithUnionListState(File stateFile) {
        return OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.singletonMap("test", new OperatorStateHandle.StateMetaInfo(new long[0], OperatorStateHandle.Mode.UNION)), (StreamStateHandle)new FileStateHandle(new Path(stateFile.getAbsolutePath()), 0L))).build();
    }

    public static final class MockOperatorCoordinatorCheckpointContext
    implements OperatorCoordinatorCheckpointContext {
        private final BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator;
        private final Consumer<Long> onCallingAfterSourceBarrierInjection;
        private final OperatorID operatorID;
        private final List<Long> completedCheckpoints;
        private final List<Long> abortedCheckpoints;

        private MockOperatorCoordinatorCheckpointContext(BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator, Consumer<Long> onCallingAfterSourceBarrierInjection, OperatorID operatorID) {
            this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
            this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
            this.operatorID = operatorID;
            this.completedCheckpoints = new ArrayList<Long>();
            this.abortedCheckpoints = new ArrayList<Long>();
        }

        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            if (this.onCallingCheckpointCoordinator != null) {
                this.onCallingCheckpointCoordinator.accept(checkpointId, result);
            }
        }

        public void afterSourceBarrierInjection(long checkpointId) {
            if (this.onCallingAfterSourceBarrierInjection != null) {
                this.onCallingAfterSourceBarrierInjection.accept(checkpointId);
            }
        }

        public void abortCurrentTriggering() {
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.completedCheckpoints.add(checkpointId);
        }

        public void notifyCheckpointAborted(long checkpointId) {
            this.abortedCheckpoints.add(checkpointId);
        }

        public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public OperatorID operatorId() {
            return this.operatorID;
        }

        public int maxParallelism() {
            return 1;
        }

        public int currentParallelism() {
            return 1;
        }

        public List<Long> getCompletedCheckpoints() {
            return this.completedCheckpoints;
        }

        public List<Long> getAbortedCheckpoints() {
            return this.abortedCheckpoints;
        }
    }

    static final class MockOperatorCheckpointCoordinatorContextBuilder {
        private BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator = null;
        private Consumer<Long> onCallingAfterSourceBarrierInjection = null;
        private OperatorID operatorID = null;

        MockOperatorCheckpointCoordinatorContextBuilder() {
        }

        public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingCheckpointCoordinator(BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator) {
            this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
            return this;
        }

        public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingAfterSourceBarrierInjection(Consumer<Long> onCallingAfterSourceBarrierInjection) {
            this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
            return this;
        }

        public MockOperatorCheckpointCoordinatorContextBuilder setOperatorID(OperatorID operatorID) {
            this.operatorID = operatorID;
            return this;
        }

        public MockOperatorCoordinatorCheckpointContext build() {
            return new MockOperatorCoordinatorCheckpointContext(this.onCallingCheckpointCoordinator, this.onCallingAfterSourceBarrierInjection, this.operatorID);
        }
    }

    public static final class StringSerializer
    implements SimpleVersionedSerializer<String> {
        static final int VERSION = 77;

        public int getVersion() {
            return 77;
        }

        public byte[] serialize(String checkpointData) throws IOException {
            return checkpointData.getBytes(StandardCharsets.UTF_8);
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            if (version != 77) {
                throw new IOException("version mismatch");
            }
            return new String(serialized, StandardCharsets.UTF_8);
        }
    }

    public static class CheckpointCoordinatorBuilder {
        private CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
        private ExecutionGraph executionGraph;
        private Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint = Collections.emptyList();
        private CheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        private CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        private CheckpointStorage checkpointStorage = new MemoryStateBackend();
        private Executor ioExecutor = Executors.directExecutor();
        private CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        private ScheduledExecutor timer = new ManuallyTriggeredScheduledExecutor();
        private SharedStateRegistryFactory sharedStateRegistryFactory = SharedStateRegistry.DEFAULT_FACTORY;
        private CheckpointFailureManager failureManager = new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)NoOpFailJobCall.INSTANCE);
        private boolean allowCheckpointsAfterTasksFinished;

        public CheckpointCoordinatorBuilder setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration) {
            this.checkpointCoordinatorConfiguration = checkpointCoordinatorConfiguration;
            return this;
        }

        public CheckpointCoordinatorBuilder setExecutionGraph(ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
            return this;
        }

        public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
            this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointsCleaner(CheckpointsCleaner checkpointsCleaner) {
            this.checkpointsCleaner = checkpointsCleaner;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointIDCounter(CheckpointIDCounter checkpointIDCounter) {
            this.checkpointIDCounter = checkpointIDCounter;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointFailureManager(CheckpointFailureManager checkpointFailureManager) {
            this.failureManager = checkpointFailureManager;
            return this;
        }

        public CheckpointCoordinatorBuilder setCompletedCheckpointStore(CompletedCheckpointStore completedCheckpointStore) {
            this.completedCheckpointStore = completedCheckpointStore;
            return this;
        }

        public CheckpointCoordinatorBuilder setIoExecutor(Executor ioExecutor) {
            this.ioExecutor = ioExecutor;
            return this;
        }

        public CheckpointCoordinatorBuilder setTimer(ScheduledExecutor timer) {
            this.timer = timer;
            return this;
        }

        public CheckpointCoordinatorBuilder setSharedStateRegistryFactory(SharedStateRegistryFactory sharedStateRegistryFactory) {
            this.sharedStateRegistryFactory = sharedStateRegistryFactory;
            return this;
        }

        public CheckpointCoordinatorBuilder setFailureManager(CheckpointFailureManager failureManager) {
            this.failureManager = failureManager;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointStorage(CheckpointStorage stateBackEnd) {
            this.checkpointStorage = stateBackEnd;
            return this;
        }

        public CheckpointCoordinatorBuilder setAllowCheckpointsAfterTasksFinished(boolean allowCheckpointsAfterTasksFinished) {
            this.allowCheckpointsAfterTasksFinished = allowCheckpointsAfterTasksFinished;
            return this;
        }

        public CheckpointCoordinator build() throws Exception {
            if (this.executionGraph == null) {
                this.executionGraph = new CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
            }
            DefaultCheckpointPlanCalculator checkpointPlanCalculator = new DefaultCheckpointPlanCalculator(this.executionGraph.getJobID(), (CheckpointPlanCalculatorContext)new ExecutionGraphCheckpointPlanCalculatorContext(this.executionGraph), this.executionGraph.getVerticesTopologically(), this.allowCheckpointsAfterTasksFinished);
            return new CheckpointCoordinator(this.executionGraph.getJobID(), this.checkpointCoordinatorConfiguration, this.coordinatorsToCheckpoint, this.checkpointIDCounter, this.completedCheckpointStore, this.checkpointStorage, this.ioExecutor, this.checkpointsCleaner, this.timer, this.sharedStateRegistryFactory, this.failureManager, (CheckpointPlanCalculator)checkpointPlanCalculator, new ExecutionAttemptMappingProvider(this.executionGraph.getAllExecutionVertices()));
        }
    }

    static class CheckpointExecutionGraphBuilder {
        private final List<JobVertex> sourceVertices = new ArrayList<JobVertex>();
        private final List<JobVertex> nonSourceVertices = new ArrayList<JobVertex>();
        private DistributionPattern distributionPattern = DistributionPattern.ALL_TO_ALL;
        private boolean transitToRunning = true;
        private TaskManagerGateway taskManagerGateway;
        private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

        CheckpointExecutionGraphBuilder() {
        }

        public CheckpointExecutionGraphBuilder addJobVertex(JobVertexID id) {
            return this.addJobVertex(id, true);
        }

        public CheckpointExecutionGraphBuilder addJobVertex(JobVertexID id, boolean isSource) {
            return this.addJobVertex(id, 1, 32768, Collections.emptyList(), isSource);
        }

        public CheckpointExecutionGraphBuilder addJobVertex(JobVertexID id, int parallelism, int maxParallelism) {
            return this.addJobVertex(id, parallelism, maxParallelism, Collections.emptyList(), true);
        }

        public CheckpointExecutionGraphBuilder addJobVertex(JobVertexID id, int parallelism, int maxParallelism, List<OperatorIDPair> operators, boolean isSource) {
            JobVertex jobVertex = operators.size() == 0 ? new JobVertex("anon", id) : new JobVertex("anon", id, operators);
            jobVertex.setParallelism(parallelism);
            jobVertex.setMaxParallelism(maxParallelism);
            jobVertex.setInvokableClass(NoOpInvokable.class);
            return this.addJobVertex(jobVertex, isSource);
        }

        public CheckpointExecutionGraphBuilder addJobVertex(JobVertex jobVertex, boolean isSource) {
            if (isSource) {
                this.sourceVertices.add(jobVertex);
            } else {
                this.nonSourceVertices.add(jobVertex);
            }
            return this;
        }

        public CheckpointExecutionGraphBuilder setTaskManagerGateway(TaskManagerGateway taskManagerGateway) {
            this.taskManagerGateway = taskManagerGateway;
            return this;
        }

        public CheckpointExecutionGraphBuilder setDistributionPattern(DistributionPattern distributionPattern) {
            this.distributionPattern = distributionPattern;
            return this;
        }

        public CheckpointExecutionGraphBuilder setTransitToRunning(boolean transitToRunning) {
            this.transitToRunning = transitToRunning;
            return this;
        }

        public CheckpointExecutionGraphBuilder setMainThreadExecutor(ComponentMainThreadExecutor mainThreadExecutor) {
            this.mainThreadExecutor = mainThreadExecutor;
            return this;
        }

        ExecutionGraph build() throws Exception {
            for (JobVertex source : this.sourceVertices) {
                for (JobVertex nonSource : this.nonSourceVertices) {
                    nonSource.connectNewDataSetAsInput(source, this.distributionPattern, ResultPartitionType.PIPELINED);
                }
            }
            ArrayList<JobVertex> allVertices = new ArrayList<JobVertex>();
            allVertices.addAll(this.sourceVertices);
            allVertices.addAll(this.nonSourceVertices);
            DefaultExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(allVertices.toArray(new JobVertex[0]));
            executionGraph.start(this.mainThreadExecutor);
            if (this.taskManagerGateway != null) {
                executionGraph.getAllExecutionVertices().forEach(task -> {
                    TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(this.taskManagerGateway).createTestingLogicalSlot();
                    task.tryAssignResource((LogicalSlot)slot);
                });
            }
            if (this.transitToRunning) {
                executionGraph.transitionToRunning();
                executionGraph.getAllExecutionVertices().forEach(task -> task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING));
            }
            return executionGraph;
        }
    }

    static class CheckpointRecorderTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private final Map<ExecutionAttemptID, List<TriggeredCheckpoint>> triggeredCheckpoints = new HashMap<ExecutionAttemptID, List<TriggeredCheckpoint>>();
        private final Map<ExecutionAttemptID, List<NotifiedCheckpoint>> notifiedCompletedCheckpoints = new HashMap<ExecutionAttemptID, List<NotifiedCheckpoint>>();
        private final Map<ExecutionAttemptID, List<NotifiedCheckpoint>> notifiedAbortCheckpoints = new HashMap<ExecutionAttemptID, List<NotifiedCheckpoint>>();

        CheckpointRecorderTaskManagerGateway() {
        }

        @Override
        public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID attemptId, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            this.triggeredCheckpoints.computeIfAbsent(attemptId, k -> new ArrayList()).add(new TriggeredCheckpoint(jobId, checkpointId, timestamp, checkpointOptions));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        @Override
        public void notifyCheckpointComplete(ExecutionAttemptID attemptId, JobID jobId, long checkpointId, long timestamp) {
            this.notifiedCompletedCheckpoints.computeIfAbsent(attemptId, k -> new ArrayList()).add(new NotifiedCheckpoint(jobId, checkpointId, timestamp));
        }

        @Override
        public void notifyCheckpointAborted(ExecutionAttemptID attemptId, JobID jobId, long checkpointId, long latestCompletedCheckpointId, long timestamp) {
            this.notifiedAbortCheckpoints.computeIfAbsent(attemptId, k -> new ArrayList()).add(new NotifiedCheckpoint(jobId, checkpointId, timestamp));
        }

        public void resetCount() {
            this.triggeredCheckpoints.clear();
            this.notifiedAbortCheckpoints.clear();
            this.notifiedCompletedCheckpoints.clear();
        }

        public List<TriggeredCheckpoint> getTriggeredCheckpoints(ExecutionAttemptID attemptId) {
            return this.triggeredCheckpoints.getOrDefault(attemptId, Collections.emptyList());
        }

        public TriggeredCheckpoint getOnlyTriggeredCheckpoint(ExecutionAttemptID attemptId) {
            List<TriggeredCheckpoint> triggeredCheckpoints = this.getTriggeredCheckpoints(attemptId);
            Assert.assertEquals((String)("There should be exactly one checkpoint triggered for " + attemptId), (long)1L, (long)triggeredCheckpoints.size());
            return triggeredCheckpoints.get(0);
        }

        public List<NotifiedCheckpoint> getNotifiedCompletedCheckpoints(ExecutionAttemptID attemptId) {
            return this.notifiedCompletedCheckpoints.getOrDefault(attemptId, Collections.emptyList());
        }

        public NotifiedCheckpoint getOnlyNotifiedCompletedCheckpoint(ExecutionAttemptID attemptId) {
            List<NotifiedCheckpoint> completedCheckpoints = this.getNotifiedCompletedCheckpoints(attemptId);
            Assert.assertEquals((String)("There should be exactly one checkpoint notified completed for " + attemptId), (long)1L, (long)completedCheckpoints.size());
            return completedCheckpoints.get(0);
        }

        public List<NotifiedCheckpoint> getNotifiedAbortedCheckpoints(ExecutionAttemptID attemptId) {
            return this.notifiedAbortCheckpoints.getOrDefault(attemptId, Collections.emptyList());
        }

        public NotifiedCheckpoint getOnlyNotifiedAbortedCheckpoint(ExecutionAttemptID attemptId) {
            List<NotifiedCheckpoint> abortedCheckpoints = this.getNotifiedAbortedCheckpoints(attemptId);
            Assert.assertEquals((String)("There should be exactly one checkpoint notified aborted for " + attemptId), (long)1L, (long)abortedCheckpoints.size());
            return abortedCheckpoints.get(0);
        }
    }

    static class NotifiedCheckpoint {
        final JobID jobId;
        final long checkpointId;
        final long timestamp;

        public NotifiedCheckpoint(JobID jobId, long checkpointId, long timestamp) {
            this.jobId = jobId;
            this.checkpointId = checkpointId;
            this.timestamp = timestamp;
        }
    }

    static class TriggeredCheckpoint {
        final JobID jobId;
        final long checkpointId;
        final long timestamp;
        final CheckpointOptions checkpointOptions;

        public TriggeredCheckpoint(JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            this.jobId = jobId;
            this.checkpointId = checkpointId;
            this.timestamp = timestamp;
            this.checkpointOptions = checkpointOptions;
        }
    }
}

