package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.class */
public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger {
    private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
    private ComponentMainThreadExecutor componentMainThreadExecutor;

    @Before
    public void setUp() {
        this.manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
        this.componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(this.manualMainThreadExecutor, Thread.currentThread());
    }

    @Test
    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph createStreamingJobGraph = createStreamingJobGraph();
        ExecutionGraph createExecutionGraph = createExecutionGraph(createStreamingJobGraph);
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        setTasksRunning(createExecutionGraph, executionVertex, (ExecutionVertex) it.next());
        CheckpointCoordinator checkpointCoordinator = createExecutionGraph.getCheckpointCoordinator();
        Preconditions.checkState(checkpointCoordinator != null);
        checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(createStreamingJobGraph.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue()), "Unknown location");
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        failVertex(executionVertex);
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        this.manualMainThreadExecutor.triggerScheduledTasks();
        assertNoPendingCheckpoints(checkpointCoordinator);
    }

    private void setTasksRunning(ExecutionGraph executionGraph, ExecutionVertex... executionVertexArr) {
        for (ExecutionVertex executionVertex : executionVertexArr) {
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.RUNNING));
        }
    }

    private void failVertex(ExecutionVertex executionVertex) {
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        this.manualMainThreadExecutor.triggerAll();
    }

    private static JobGraph createStreamingJobGraph() {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        return jobGraph;
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setRestartStrategy(new FixedDelayRestartStrategy(10, 0L)).setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new).setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 2)).build();
        enableCheckpointing(build);
        build.start(this.componentMainThreadExecutor);
        build.scheduleForExecution();
        this.manualMainThreadExecutor.triggerAll();
        return build;
    }

    private static void enableCheckpointing(ExecutionGraph executionGraph) {
        ArrayList arrayList = new ArrayList(executionGraph.getAllVertices().values());
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(Long.MAX_VALUE, Long.MAX_VALUE, 0L, 1, CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, true, false, 0);
        executionGraph.enableCheckpointing(checkpointCoordinatorConfiguration, arrayList, arrayList, arrayList, Collections.emptyList(), new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), new CheckpointStatsTracker(0, arrayList, checkpointCoordinatorConfiguration, new UnregisteredMetricsGroup()));
    }

    private static void assertNoPendingCheckpoints(CheckpointCoordinator checkpointCoordinator) {
        Assert.assertThat(checkpointCoordinator.getPendingCheckpoints().entrySet(), Matchers.is(Matchers.empty()));
    }
}
