package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.class */
public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger {
    private static final JobID TEST_JOB_ID = new JobID();
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest$TestAdaptedRestartPipelinedRegionStrategyNG.class */
    public static class TestAdaptedRestartPipelinedRegionStrategyNG extends AdaptedRestartPipelinedRegionStrategyNG {
        private CompletableFuture<?> blockerFuture;
        private Set<ExecutionVertexID> lastTasksToRestart;

        /* JADX INFO: Access modifiers changed from: package-private */
        public TestAdaptedRestartPipelinedRegionStrategyNG(ExecutionGraph executionGraph) {
            super(executionGraph);
            this.blockerFuture = CompletableFuture.completedFuture(null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setBlockerFuture(CompletableFuture<?> completableFuture) {
            this.blockerFuture = completableFuture;
        }

        protected void restartTasks(Set<ExecutionVertexID> set) {
            this.lastTasksToRestart = set;
            super.restartTasks(set);
        }

        protected CompletableFuture<?> cancelTasks(Set<ExecutionVertexID> set) {
            return FutureUtils.waitForAll(Arrays.asList(super.cancelTasks(set), this.blockerFuture));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public CompletableFuture<?> getBlockerFuture() {
            return this.blockerFuture;
        }

        Set<ExecutionVertexID> getLastTasksToCancel() {
            return this.lastTasksToRestart;
        }
    }

    @Before
    public void setUp() {
        this.manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
        this.componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(this.manualMainThreadExecutor, Thread.currentThread());
    }

    @Test
    public void testRegionFailoverInEagerMode() throws Exception {
        Iterator it = createExecutionGraph(createStreamingJobGraph()).getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        this.manualMainThreadExecutor.triggerAll();
        this.manualMainThreadExecutor.triggerScheduledTasks();
        assertVertexInState(ExecutionState.FAILED, executionVertex);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex2);
        assertVertexInState(ExecutionState.CANCELING, executionVertex3);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex4);
        executionVertex3.getCurrentExecutionAttempt().completeCancelling();
        this.manualMainThreadExecutor.triggerAll();
        this.manualMainThreadExecutor.triggerScheduledTasks();
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex2);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex3);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex4);
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(0L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex3.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(0L, executionVertex4.getCurrentExecutionAttempt().getAttemptNumber());
    }

    @Test
    public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Exception {
        Iterator it = createExecutionGraph(createBatchJobGraph()).getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        this.manualMainThreadExecutor.triggerAll();
        this.manualMainThreadExecutor.triggerScheduledTasks();
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex);
        assertVertexInState(ExecutionState.DEPLOYING, executionVertex2);
        assertVertexInState(ExecutionState.CREATED, executionVertex3);
        assertVertexInState(ExecutionState.CREATED, executionVertex4);
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(0L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex3.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex4.getCurrentExecutionAttempt().getAttemptNumber());
    }

    @Test
    public void testFailurePropagationToUnderlyingStrategy() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(createBatchJobGraph());
        TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = createExecutionGraph.getFailoverStrategy();
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        executionVertex.getCurrentExecutionAttempt().markFinished();
        executionVertex2.getCurrentExecutionAttempt().markFinished();
        executionVertex3.getCurrentExecutionAttempt().fail(new PartitionConnectionException(new ResultPartitionID((IntermediateResultPartitionID) executionVertex.getProducedPartitions().keySet().iterator().next(), executionVertex.getCurrentExecutionAttempt().getAttemptId()), new Exception("Test failure")));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertThat(failoverStrategy.getLastTasksToCancel(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{executionVertex.getID(), executionVertex3.getID(), executionVertex4.getID()}));
    }

    @Test
    public void testNoRestart() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(createBatchJobGraph(), new NoRestartStrategy());
        ((ExecutionVertex) createExecutionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().completeCancelling();
        }
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(JobStatus.FAILED, createExecutionGraph.getState());
    }

    @Test
    public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(createBatchJobGraph(), new TestRestartStrategy());
        ((ExecutionVertex) createExecutionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(r0.getNumberOfQueuedActions(), 1L);
        Assert.assertEquals(createExecutionGraph.getRegisteredExecutions().size(), 1L);
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
    }

    @Test
    public void testFailGlobalIfErrorOnRestartTasks() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(createStreamingJobGraph(), new FailingRestartStrategy(1));
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        long globalModVersion = createExecutionGraph.getGlobalModVersion();
        executionVertex.fail(new Exception("Test Exception"));
        completeCancelling(executionVertex, executionVertex2, executionVertex3, executionVertex4);
        this.manualMainThreadExecutor.triggerAll();
        this.manualMainThreadExecutor.triggerScheduledTasks();
        Assert.assertNotEquals(globalModVersion, createExecutionGraph.getGlobalModVersion());
    }

    private JobGraph createStreamingJobGraph() {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        return jobGraph;
    }

    private JobGraph createBatchJobGraph() {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
        return jobGraph;
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0L));
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, RestartStrategy restartStrategy) throws Exception {
        ExecutionGraph build = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph).setRestartStrategy(restartStrategy).setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new).setPartitionTracker(new PartitionTrackerImpl(jobGraph.getJobID(), NettyShuffleMaster.INSTANCE, resourceID -> {
            return Optional.empty();
        })).build();
        build.start(this.componentMainThreadExecutor);
        build.scheduleForExecution();
        this.manualMainThreadExecutor.triggerAll();
        return build;
    }

    private static void assertVertexInState(ExecutionState executionState, ExecutionVertex executionVertex) {
        Assert.assertEquals(executionState, executionVertex.getExecutionState());
    }

    private static void completeCancelling(ExecutionVertex... executionVertexArr) {
        for (ExecutionVertex executionVertex : executionVertexArr) {
            executionVertex.getCurrentExecutionAttempt().completeCancelling();
        }
    }
}
