package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.class */
public class PipelinedRegionFailoverConcurrencyTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest$FailoverPipelinedRegionWithCustomExecutor.class */
    private static class FailoverPipelinedRegionWithCustomExecutor implements FailoverStrategy.Factory {
        private final Executor executor;

        FailoverPipelinedRegionWithCustomExecutor(Executor executor) {
            this.executor = executor;
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph, this.executor);
        }
    }

    @Test
    public void testCancelWhileInLocalFailover() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new FailoverPipelinedRegionWithCustomExecutor(manuallyTriggeredDirectExecutor), new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), new SimpleSlotProvider(jobID, 2), 2);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.CANCELED, createSampleGraph.getTerminationFuture().get());
        Assert.assertTrue(executionVertex.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue(executionVertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals(2L, r0.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new FailoverPipelinedRegionWithCustomExecutor(manuallyTriggeredDirectExecutor), new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), new SimpleSlotProvider(jobID, 2), 2);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.failGlobal(new SuppressRestartsException(new Exception("test exception")));
        Assert.assertEquals(JobStatus.FAILING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.FAILED, createSampleGraph.getState());
        Assert.assertTrue(executionVertex.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue(executionVertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals(2L, r0.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new FailoverPipelinedRegionWithCustomExecutor(manuallyTriggeredDirectExecutor), new FixedDelayRestartStrategy(2, 0L), new SimpleSlotProvider(jobID, 2), 2);
        RestartPipelinedRegionStrategy failoverStrategy = createSampleGraph.getFailoverStrategy();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex2).getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.failGlobal(new Exception("test exception"));
        Assert.assertEquals(JobStatus.FAILING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getCurrentExecutionAttempt().getState());
        executionVertex.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilJobStatus(createSampleGraph, JobStatus.RUNNING, 1000L);
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        executionVertex.getCurrentExecutionAttempt().switchToRunning();
        executionVertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex2).getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(1L, executionVertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(0L, r0.getNumberOfAvailableSlots());
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        manuallyTriggeredDirectExecutor.trigger();
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        executionVertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex2).getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(2L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(2L, executionVertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(0L, r0.getNumberOfAvailableSlots());
    }

    private ExecutionGraph createSampleGraph(JobID jobID, FailoverStrategy.Factory factory, RestartStrategy restartStrategy, SlotProvider slotProvider, int i) throws Exception {
        DummyJobInformation dummyJobInformation = new DummyJobInformation(jobID, "test job");
        Time seconds = Time.seconds(10L);
        ExecutionGraph executionGraph = new ExecutionGraph(dummyJobInformation, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), seconds, restartStrategy, factory, slotProvider, getClass().getClassLoader(), VoidBlobWriter.getInstance(), seconds);
        JobVertex jobVertex = new JobVertex("test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        executionGraph.attachJobGraph(new JobGraph(jobID, "testjob", new JobVertex[]{jobVertex}).getVerticesSortedTopologicallyFromSources());
        return executionGraph;
    }
}
