package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.class */
public class ExecutionGraphRestartTest {
    private static final int NUM_TASKS = 31;

    @Test
    public void testNotRestartManually() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex jobVertex = new JobVertex("Task");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{jobVertex});
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout());
        executionGraph.setNumberOfRetriesLeft(0);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex jobVertex = new JobVertex("Task");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{jobVertex});
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), AkkaUtils.getDefaultTimeout());
        executionGraph.setNumberOfRetriesLeft(1);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        Deadline fromNow2 = finiteDuration.fromNow();
        boolean z = false;
        while (fromNow2.hasTimeLeft() && !z) {
            z = true;
            Iterator it2 = executionGraph.getAllExecutionVertices().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().getAssignedResource() == null) {
                    z = false;
                    Thread.sleep(100L);
                    break;
                }
            }
        }
        if (!fromNow2.hasTimeLeft()) {
            Assert.fail("Failed to wait until all execution attempts left the state DEPLOYING.");
            return;
        }
        Iterator it3 = executionGraph.getAllExecutionVertices().iterator();
        while (it3.hasNext()) {
            ((ExecutionVertex) it3.next()).getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals(JobStatus.FINISHED, executionGraph.getState());
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout());
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
        executionGraph.setDelayBeforeRetrying(2147483647L);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        ExecutionGraph executionGraph = (ExecutionGraph) Mockito.spy(new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout()));
        ((ExecutionGraph) Mockito.doNothing().when(executionGraph)).jobVertexInFinalState();
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
        executionGraph.setDelayBeforeRetrying(2147483647L);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        boolean z = false;
        while (fromNow.hasTimeLeft() && !z) {
            z = true;
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (((ExecutionVertex) it.next()).getExecutionState() != ExecutionState.FAILED) {
                    z = false;
                    Thread.sleep(100L);
                    break;
                }
            }
        }
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        ((ExecutionGraph) Mockito.doCallRealMethod().when(executionGraph)).jobVertexInFinalState();
        executionGraph.jobVertexInFinalState();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }
}
