package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.class */
public class ExecutionGraphSuspendTest extends TestLogger {
    @Test
    public void testSuspendedOutOfCreated() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateAllVerticesInState(executionGraph, ExecutionState.CANCELED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 0);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfDeploying() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        validateAllVerticesInState(executionGraph, ExecutionState.DEPLOYING);
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfRunning() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        validateAllVerticesInState(executionGraph, ExecutionState.RUNNING);
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailing() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.handleGlobalFailure(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailed() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.handleGlobalFailure(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    public void testSuspendedOutOfCanceling() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfCanceled() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getTerminationFuture().get());
        createScheduler.closeAsync();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        DefaultScheduler build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.emptyJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(manuallyTriggeredScheduledExecutor).build();
        build.startScheduling();
        ExecutionGraph executionGraph = build.getExecutionGraph();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        build.handleGlobalFailure(new Exception("test"));
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        build.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
    }

    private static void ensureCannotLeaveSuspendedState(SchedulerBase schedulerBase, InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        ExecutionGraph executionGraph = schedulerBase.getExecutionGraph();
        interactionsCountingTaskManagerGateway.waitUntilAllTasksAreSubmitted();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        interactionsCountingTaskManagerGateway.resetCounts();
        schedulerBase.handleGlobalFailure(new Exception("fail"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        schedulerBase.cancel();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        schedulerBase.closeAsync();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void validateNoInteractions(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getInteractionsCount()), Matchers.is(0));
    }

    private static void validateAllVerticesInState(ExecutionGraph executionGraph, ExecutionState executionState) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(executionState, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState());
        }
    }

    private static void validateCancelRpcCalls(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway, int i) {
        Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getCancelTaskCount()), Matchers.is(Integer.valueOf(i)));
    }

    private static SchedulerBase createScheduler(TaskManagerGateway taskManagerGateway, int i) throws Exception {
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(i, taskManagerGateway))).build();
    }
}
