/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import akka.dispatch.Futures;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class ExecutionGraphRestartTest
extends TestLogger {
    private static final int NUM_TASKS = 31;

    @Test
    public void testNoManualRestart() throws Exception {
        NoRestartStrategy restartStrategy = new NoRestartStrategy();
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        eg.restart();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex groupVertex = ExecutionGraphRestartTest.newJobVertex("Task1", 31, Tasks.NoOpInvokable.class);
        JobVertex groupVertex2 = ExecutionGraphRestartTest.newJobVertex("Task2", 31, Tasks.NoOpInvokable.class);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        groupVertex.setSlotSharingGroup(sharingGroup);
        groupVertex2.setSlotSharingGroup(sharingGroup);
        groupVertex.setStrictlyCoLocatedWith(groupVertex2);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{groupVertex, groupVertex2});
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 0L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        this.validateConstraints(eg);
        ExecutionGraphRestartTest.restartAfterFailure(eg, new FiniteDuration(2L, TimeUnit.MINUTES), false);
        this.validateConstraints(eg);
        ExecutionGraphRestartTest.haltExecution(eg);
    }

    private void validateConstraints(ExecutionGraph eg) {
        ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < 31; ++i) {
            CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
            CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
            Assert.assertNotNull((Object)constr1.getSharedSlot());
            Assert.assertTrue((boolean)constr1.isAssigned());
            Assert.assertEquals((Object)constr1, (Object)constr2);
        }
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 1000L);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ExecutionGraphRestartTest.restartAfterFailure(eg, new FiniteDuration(2L, TimeUnit.MINUTES), true);
    }

    @Test
    public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
        FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of((long)10L, (TimeUnit)TimeUnit.SECONDS), Time.of((long)0L, (TimeUnit)TimeUnit.SECONDS));
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.SECONDS);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ExecutionGraphRestartTest.restartAfterFailure(eg, timeout, false);
        ExecutionGraphRestartTest.restartAfterFailure(eg, timeout, false);
        ExecutionGraphRestartTest.makeAFailureAndWait(eg, timeout);
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
        FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of((long)1L, (TimeUnit)TimeUnit.MILLISECONDS), Time.of((long)0L, (TimeUnit)TimeUnit.SECONDS));
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.SECONDS);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ExecutionGraphRestartTest.restartAfterFailure(eg, timeout, false);
        ExecutionGraphRestartTest.restartAfterFailure(eg, timeout, false);
        ExecutionGraphRestartTest.restartAfterFailure(eg, timeout, false);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph executionGraph = (ExecutionGraph)executionGraphInstanceTuple.f0;
        Instance instance = (Instance)executionGraphInstanceTuple.f1;
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        while (deadline.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        while (deadline.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        executionGraph.fail((Throwable)new Exception("Test exception"));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createSpyExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph executionGraph = (ExecutionGraph)executionGraphInstanceTuple.f0;
        Instance instance = (Instance)executionGraphInstanceTuple.f1;
        ((ExecutionGraph)Mockito.doNothing().when((Object)executionGraph)).jobVertexInFinalState();
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        boolean success = false;
        block0: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                ExecutionState state = vertex.getExecutionState();
                if (state == ExecutionState.FAILED || state == ExecutionState.CANCELED) continue;
                success = false;
                Thread.sleep(100L);
                continue block0;
            }
        }
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)executionGraph.getState());
        ((ExecutionGraph)Mockito.doCallRealMethod().when((Object)executionGraph)).jobVertexInFinalState();
        executionGraph.jobVertexInFinalState();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testNoRestartOnSuppressException() throws Exception {
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createSpyExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 1000L));
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new SuppressRestartsException((Throwable)new Exception("Test Exception")));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        ((ExecutionGraph)Mockito.verify((Object)eg, (VerificationMode)Mockito.never())).restart();
        RestartStrategy restartStrategy = eg.getRestartStrategy();
        Assert.assertTrue((boolean)(restartStrategy instanceof FixedDelayRestartStrategy));
        Assert.assertEquals((long)0L, (long)((FixedDelayRestartStrategy)restartStrategy).getCurrentRestartAttempt());
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = ExecutionGraphRestartTest.newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
        JobVertex receiver = ExecutionGraphRestartTest.newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender, receiver});
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 1000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Iterator executionVertices = eg.getAllExecutionVertices().iterator();
        Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        finishedExecution.markFinished();
        failedExecution.fail((Throwable)new Exception("Test Exception"));
        failedExecution.cancelingComplete();
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        ExecutionGraphRestartTest.waitForAsyncRestart(eg, timeout);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphRestartTest.waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, timeout.fromNow());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            Assert.assertNotNull((String)"No assigned resource (test instability).", (Object)vertex.getCurrentAssignedResource());
            vertex.getCurrentExecutionAttempt().switchToRunning();
        }
        finishedExecution.fail((Throwable)new Exception("This should have no effect"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)finishedExecution.getState());
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = ExecutionGraphRestartTest.newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 1000000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        for (ExecutionVertex v : eg.getAllExecutionVertices()) {
            v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
        }
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionGraphAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = ExecutionGraphRestartTest.newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 1000000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        eg.fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        FiniteDuration timeout = new FiniteDuration(1L, TimeUnit.MINUTES);
        Deadline deadline = timeout.fromNow();
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)controllableRestartStrategy);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        instance.markDead();
        Await.ready(controllableRestartStrategy.getReachedCanRestart(), (Duration)deadline.timeLeft());
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        eg.suspend((Throwable)new Exception("Test exception"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        controllableRestartStrategy.unlockRestart();
        Await.ready(controllableRestartStrategy.getRestartDone(), (Duration)deadline.timeLeft());
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return ExecutionGraphRestartTest.createExecutionGraph(restartStrategy, false);
    }

    private static Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return ExecutionGraphRestartTest.createExecutionGraph(restartStrategy, true);
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy, boolean isSpy) throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = ExecutionGraphRestartTest.newJobVertex("Task", 31, Tasks.NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph(restartStrategy);
        if (isSpy) {
            eg = (ExecutionGraph)Mockito.spy((Object)eg);
        }
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        return new Tuple2((Object)eg, (Object)instance);
    }

    private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) {
        JobVertex groupVertex = new JobVertex(task1);
        groupVertex.setInvokableClass(invokable);
        groupVertex.setParallelism(numTasks);
        return groupVertex;
    }

    private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
        return new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), restartStrategy);
    }

    private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
        ExecutionGraphRestartTest.makeAFailureAndWait(eg, timeout);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Deadline deadline = timeout.fromNow();
        ExecutionGraphRestartTest.waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, deadline);
        if (haltAfterRestart) {
            if (deadline.hasTimeLeft()) {
                ExecutionGraphRestartTest.haltExecution(eg);
            } else {
                Assert.fail((String)"Failed to wait until all execution attempts left the state DEPLOYING.");
            }
        }
    }

    private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph eg, Deadline deadline) throws InterruptedException {
        boolean success = false;
        block0: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                if (vertex.getCurrentExecutionAttempt().getAssignedResource() != null) continue;
                success = false;
                Thread.sleep(100L);
                continue block0;
            }
        }
    }

    private static void makeAFailureAndWait(ExecutionGraph eg, FiniteDuration timeout) throws InterruptedException {
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        ExecutionGraphRestartTest.waitForAsyncRestart(eg, timeout);
    }

    private static void waitForAsyncRestart(ExecutionGraph eg, FiniteDuration timeout) throws InterruptedException {
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
    }

    private static void haltExecution(ExecutionGraph eg) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    private static class ControllableRestartStrategy
    implements RestartStrategy {
        private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise();
        private Promise<Boolean> doRestart = new Promise.DefaultPromise();
        private Promise<Boolean> restartDone = new Promise.DefaultPromise();
        private volatile Exception exception = null;
        private FiniteDuration timeout;

        public ControllableRestartStrategy(FiniteDuration timeout) {
            this.timeout = timeout;
        }

        public void unlockRestart() {
            this.doRestart.success((Object)true);
        }

        public Exception getException() {
            return this.exception;
        }

        public Future<Boolean> getReachedCanRestart() {
            return this.reachedCanRestart.future();
        }

        public Future<Boolean> getRestartDone() {
            return this.restartDone.future();
        }

        public boolean canRestart() {
            this.reachedCanRestart.success((Object)true);
            return true;
        }

        public void restart(final ExecutionGraph executionGraph) {
            Futures.future((Callable)new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    try {
                        Await.ready((Awaitable)ControllableRestartStrategy.this.doRestart.future(), (Duration)ControllableRestartStrategy.this.timeout);
                        executionGraph.restart();
                    }
                    catch (Exception e) {
                        ControllableRestartStrategy.this.exception = e;
                    }
                    ControllableRestartStrategy.this.restartDone.success((Object)true);
                    return null;
                }
            }, (ExecutionContext)TestingUtils.defaultExecutionContext());
        }
    }
}

