package org.apache.flink.runtime.executiongraph;

import akka.dispatch.Futures;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.class */
public class ExecutionGraphRestartTest extends TestLogger {
    private static final int NUM_TASKS = 31;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest$ControllableRestartStrategy.class */
    private static class ControllableRestartStrategy implements RestartStrategy {
        private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise();
        private Promise<Boolean> doRestart = new Promise.DefaultPromise();
        private Promise<Boolean> restartDone = new Promise.DefaultPromise();
        private volatile Exception exception = null;
        private FiniteDuration timeout;

        public ControllableRestartStrategy(FiniteDuration finiteDuration) {
            this.timeout = finiteDuration;
        }

        public void unlockRestart() {
            this.doRestart.success(true);
        }

        public Exception getException() {
            return this.exception;
        }

        public Future<Boolean> getReachedCanRestart() {
            return this.reachedCanRestart.future();
        }

        public Future<Boolean> getRestartDone() {
            return this.restartDone.future();
        }

        public boolean canRestart() {
            this.reachedCanRestart.success(true);
            return true;
        }

        public void restart(final ExecutionGraph executionGraph) {
            Futures.future(new Callable<Object>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.ControllableRestartStrategy.1
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    try {
                        Await.ready(ControllableRestartStrategy.this.doRestart.future(), ControllableRestartStrategy.this.timeout);
                        executionGraph.restart();
                    } catch (Exception e) {
                        ControllableRestartStrategy.this.exception = e;
                    }
                    ControllableRestartStrategy.this.restartDone.success(true);
                    return null;
                }
            }, TestingUtils.defaultExecutionContext());
        }
    }

    @Test
    public void testNoManualRestart() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(new NoRestartStrategy()).f0;
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex newJobVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class);
        JobVertex newJobVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        newJobVertex.setSlotSharingGroup(slotSharingGroup);
        newJobVertex2.setSlotSharingGroup(slotSharingGroup);
        newJobVertex.setStrictlyCoLocatedWith(newJobVertex2);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{newJobVertex, newJobVertex2});
        ExecutionGraph newExecutionGraph = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L));
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        validateConstraints(newExecutionGraph);
        restartAfterFailure(newExecutionGraph, new FiniteDuration(2L, TimeUnit.MINUTES), false);
        validateConstraints(newExecutionGraph);
        haltExecution(newExecutionGraph);
    }

    private void validateConstraints(ExecutionGraph executionGraph) {
        ExecutionJobVertex[] executionJobVertexArr = (ExecutionJobVertex[]) executionGraph.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < NUM_TASKS; i++) {
            CoLocationConstraint locationConstraint = executionJobVertexArr[0].getTaskVertices()[i].getLocationConstraint();
            CoLocationConstraint locationConstraint2 = executionJobVertexArr[1].getTaskVertices()[i].getLocationConstraint();
            Assert.assertNotNull(locationConstraint.getSharedSlot());
            Assert.assertTrue(locationConstraint.isAssigned());
            Assert.assertEquals(locationConstraint, locationConstraint2);
        }
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        restartAfterFailure((ExecutionGraph) createExecutionGraph(new FixedDelayRestartStrategy(1, 1000L)).f0, new FiniteDuration(2L, TimeUnit.MINUTES), true);
    }

    @Test
    public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
        FailureRateRestartStrategy failureRateRestartStrategy = new FailureRateRestartStrategy(2, Time.of(10L, TimeUnit.SECONDS), Time.of(0L, TimeUnit.SECONDS));
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.SECONDS);
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(failureRateRestartStrategy).f0;
        restartAfterFailure(executionGraph, finiteDuration, false);
        restartAfterFailure(executionGraph, finiteDuration, false);
        makeAFailureAndWait(executionGraph, finiteDuration);
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
        FailureRateRestartStrategy failureRateRestartStrategy = new FailureRateRestartStrategy(1, Time.of(1L, TimeUnit.MILLISECONDS), Time.of(0L, TimeUnit.SECONDS));
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.SECONDS);
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(failureRateRestartStrategy).f0;
        restartAfterFailure(executionGraph, finiteDuration, false);
        restartAfterFailure(executionGraph, finiteDuration, false);
        restartAfterFailure(executionGraph, finiteDuration, false);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        Tuple2<ExecutionGraph, Instance> createExecutionGraph = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph.f0;
        ((Instance) createExecutionGraph.f1).markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        executionGraph.attachJobGraph(new JobGraph("TestJob", new JobVertex[]{jobVertex}).getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.fail(new Exception("Test exception"));
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph = createSpyExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        ExecutionGraph executionGraph = (ExecutionGraph) createSpyExecutionGraph.f0;
        Instance instance = (Instance) createSpyExecutionGraph.f1;
        ((ExecutionGraph) Mockito.doNothing().when(executionGraph)).jobVertexInFinalState();
        instance.markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        boolean z = false;
        while (fromNow.hasTimeLeft() && !z) {
            z = true;
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ExecutionState executionState = ((ExecutionVertex) it.next()).getExecutionState();
                if (executionState != ExecutionState.FAILED && executionState != ExecutionState.CANCELED) {
                    z = false;
                    Thread.sleep(100L);
                    break;
                }
            }
        }
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        ((ExecutionGraph) Mockito.doCallRealMethod().when(executionGraph)).jobVertexInFinalState();
        executionGraph.jobVertexInFinalState();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }

    @Test
    public void testNoRestartOnSuppressException() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createSpyExecutionGraph(new FixedDelayRestartStrategy(1, 1000L)).f0;
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new SuppressRestartsException(new Exception("Test Exception")));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.FAILED) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        ((ExecutionGraph) Mockito.verify(executionGraph, Mockito.never())).restart();
        Assert.assertTrue(executionGraph.getRestartStrategy() instanceof FixedDelayRestartStrategy);
        Assert.assertEquals(0L, r0.getCurrentRestartAttempt());
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{newJobVertex("Task1", 1, Tasks.NoOpInvokable.class), newJobVertex("Task2", 1, Tasks.NoOpInvokable.class)});
        ExecutionGraph newExecutionGraph = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000L));
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        Iterator it = newExecutionGraph.getAllExecutionVertices().iterator();
        Execution currentExecutionAttempt = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
        Execution currentExecutionAttempt2 = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
        currentExecutionAttempt.markFinished();
        currentExecutionAttempt2.fail(new Exception("Test Exception"));
        currentExecutionAttempt2.cancelingComplete();
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.MINUTES);
        waitForAsyncRestart(newExecutionGraph, finiteDuration);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        waitForAllResourcesToBeAssignedAfterAsyncRestart(newExecutionGraph, finiteDuration.fromNow());
        for (ExecutionVertex executionVertex : newExecutionGraph.getAllExecutionVertices()) {
            Assert.assertNotNull("No assigned resource (test instability).", executionVertex.getCurrentAssignedResource());
            executionVertex.getCurrentExecutionAttempt().switchToRunning();
        }
        currentExecutionAttempt.fail(new Exception("This should have no effect"));
        Iterator it2 = newExecutionGraph.getAllExecutionVertices().iterator();
        while (it2.hasNext()) {
            ((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals(ExecutionState.FINISHED, currentExecutionAttempt.getState());
        Assert.assertEquals(JobStatus.FINISHED, newExecutionGraph.getState());
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex newJobVertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2147483647L));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{newJobVertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph newExecutionGraph = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000L));
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        newExecutionGraph.cancel();
        Iterator it = newExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        }
        Assert.assertEquals(JobStatus.CANCELED, newExecutionGraph.getState());
        ((ExecutionVertex) newExecutionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.CANCELED, newExecutionGraph.getState());
    }

    @Test
    public void testFailExecutionGraphAfterCancel() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex newJobVertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2147483647L));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{newJobVertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph newExecutionGraph = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000L));
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        newExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, newExecutionGraph.getState());
        newExecutionGraph.fail(new Exception("Test Exception"));
        Assert.assertEquals(JobStatus.FAILING, newExecutionGraph.getState());
        ((ExecutionVertex) newExecutionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.RESTARTING, newExecutionGraph.getState());
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.MINUTES);
        Deadline fromNow = finiteDuration.fromNow();
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex jobVertex = new JobVertex("Task");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{jobVertex});
        ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(finiteDuration);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), controllableRestartStrategy);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        Await.ready(controllableRestartStrategy.getReachedCanRestart(), fromNow.timeLeft());
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.suspend(new Exception("Test exception"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        controllableRestartStrategy.unlockRestart();
        Await.ready(controllableRestartStrategy.getRestartDone(), fromNow.timeLeft());
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return createExecutionGraph(restartStrategy, false);
    }

    private static Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return createExecutionGraph(restartStrategy, true);
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy, boolean z) throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class)});
        ExecutionGraph newExecutionGraph = newExecutionGraph(restartStrategy);
        if (z) {
            newExecutionGraph = (ExecutionGraph) Mockito.spy(newExecutionGraph);
        }
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        return new Tuple2<>(newExecutionGraph, executionGraphTestUtils);
    }

    private static JobVertex newJobVertex(String str, int i, Class<Tasks.NoOpInvokable> cls) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setInvokableClass(cls);
        jobVertex.setParallelism(i);
        return jobVertex;
    }

    private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
        return new ExecutionGraph(TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), restartStrategy);
    }

    private static void restartAfterFailure(ExecutionGraph executionGraph, FiniteDuration finiteDuration, boolean z) throws InterruptedException {
        makeAFailureAndWait(executionGraph, finiteDuration);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        Deadline fromNow = finiteDuration.fromNow();
        waitForAllResourcesToBeAssignedAfterAsyncRestart(executionGraph, fromNow);
        if (z) {
            if (fromNow.hasTimeLeft()) {
                haltExecution(executionGraph);
            } else {
                Assert.fail("Failed to wait until all execution attempts left the state DEPLOYING.");
            }
        }
    }

    private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph executionGraph, Deadline deadline) throws InterruptedException {
        boolean z = false;
        while (deadline.hasTimeLeft() && !z) {
            z = true;
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAssignedResource() == null) {
                    z = false;
                    Thread.sleep(100L);
                    break;
                }
            }
        }
    }

    private static void makeAFailureAndWait(ExecutionGraph executionGraph, FiniteDuration finiteDuration) throws InterruptedException {
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        waitForAsyncRestart(executionGraph, finiteDuration);
    }

    private static void waitForAsyncRestart(ExecutionGraph executionGraph, FiniteDuration finiteDuration) throws InterruptedException {
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
    }

    private static void haltExecution(ExecutionGraph executionGraph) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals(JobStatus.FINISHED, executionGraph.getState());
    }
}
