package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.class */
public class ExecutionGraphRestartTest extends TestLogger {
    private static final int NUM_TASKS = 31;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest$ControllableRestartStrategy.class */
    private static class ControllableRestartStrategy implements RestartStrategy {
        private final OneShotLatch reachedCanRestart = new OneShotLatch();
        private final OneShotLatch doRestart = new OneShotLatch();
        private final OneShotLatch restartDone = new OneShotLatch();
        private final Time timeout;
        private volatile Exception exception;

        public ControllableRestartStrategy(Time time) {
            this.timeout = time;
        }

        public void unlockRestart() {
            this.doRestart.trigger();
        }

        public Exception getException() {
            return this.exception;
        }

        public OneShotLatch getReachedCanRestart() {
            return this.reachedCanRestart;
        }

        public OneShotLatch getRestartDone() {
            return this.restartDone;
        }

        public boolean canRestart() {
            this.reachedCanRestart.trigger();
            return true;
        }

        public void restart(final RestartCallback restartCallback, ScheduledExecutor scheduledExecutor) {
            scheduledExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.ControllableRestartStrategy.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ControllableRestartStrategy.this.doRestart.await(ControllableRestartStrategy.this.timeout.getSize(), ControllableRestartStrategy.this.timeout.getUnit());
                        restartCallback.triggerFullRecovery();
                    } catch (Exception e) {
                        ControllableRestartStrategy.this.exception = e;
                    }
                    ControllableRestartStrategy.this.restartDone.trigger();
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest$TriggeredRestartStrategy.class */
    private static final class TriggeredRestartStrategy implements RestartStrategy {
        private final OneShotLatch latch;

        TriggeredRestartStrategy(OneShotLatch oneShotLatch) {
            this.latch = oneShotLatch;
        }

        public boolean canRestart() {
            return true;
        }

        public void restart(final RestartCallback restartCallback, ScheduledExecutor scheduledExecutor) {
            scheduledExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.TriggeredRestartStrategy.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        TriggeredRestartStrategy.this.latch.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    restartCallback.triggerFullRecovery();
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest$WaitForTasks.class */
    public static class WaitForTasks implements Consumer<ExecutionAttemptID> {
        private final int tasksToWaitFor;
        private final CompletableFuture<Boolean> allTasksReceived = new CompletableFuture<>();
        private final AtomicInteger counter = new AtomicInteger();

        public WaitForTasks(int i) {
            this.tasksToWaitFor = i;
        }

        public CompletableFuture<Boolean> getFuture() {
            return this.allTasksReceived;
        }

        @Override // java.util.function.Consumer
        public void accept(ExecutionAttemptID executionAttemptID) {
            if (this.counter.incrementAndGet() >= this.tasksToWaitFor) {
                this.allTasksReceived.complete(true);
            }
        }
    }

    @After
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test
    public void testNoManualRestart() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(new NoRestartStrategy()).f0;
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        executionGraph.restart(executionGraph.getGlobalModVersion());
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        restartAfterFailure((ExecutionGraph) createExecutionGraph(new FixedDelayRestartStrategy(1, 1000L)).f0, new FiniteDuration(2L, TimeUnit.MINUTES), true);
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        Tuple2<ExecutionGraph, Instance> createExecutionGraph = createExecutionGraph(new InfiniteDelayRestartStrategy());
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph.f0;
        ((Instance) createExecutionGraph.f1).markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
        executionGraph.restart(executionGraph.getGlobalModVersion());
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), NUM_TASKS);
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new JobID(), "TestJob", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new InfiniteDelayRestartStrategy(), scheduler);
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        executionGraph.attachJobGraph(new JobGraph("TestJob", new JobVertex[]{jobVertex}).getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        Deadline fromNow = TestingUtils.TESTING_DURATION().fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.failGlobal(new Exception("Test exception"));
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.failGlobal(new SuppressRestartsException(new Exception("Test exception")));
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        executionGraph.restart(executionGraph.getGlobalModVersion());
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(new InfiniteDelayRestartStrategy()).f0;
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) executionGraph.getVerticesTopologically().iterator().next()).getTaskVertices()) {
            executionVertex.getCurrentExecutionAttempt().switchToRunning();
        }
        executionGraph.failGlobal(new Exception("test"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        for (ExecutionVertex executionVertex2 : ((ExecutionJobVertex) executionGraph.getVerticesTopologically().iterator().next()).getTaskVertices()) {
            executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.CANCELED, executionGraph.getState());
    }

    @Test
    public void testFailWhileCanceling() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(new NoRestartStrategy()).f0;
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) executionGraph.getVerticesTopologically().iterator().next()).getTaskVertices()) {
            executionVertex.getCurrentExecutionAttempt().switchToRunning();
        }
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, executionGraph.getState());
        executionGraph.failGlobal(new Exception("test"));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        for (ExecutionVertex executionVertex2 : ((ExecutionJobVertex) executionGraph.getVerticesTopologically().iterator().next()).getTaskVertices()) {
            executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testNoRestartOnSuppressException() throws Exception {
        ExecutionGraph executionGraph = (ExecutionGraph) createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L)).f0;
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new SuppressRestartsException(new Exception("Test Exception")));
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        executionGraph.waitUntilTerminal();
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
        Assert.assertTrue(executionGraph.getRestartStrategy() instanceof FixedDelayRestartStrategy);
        Assert.assertEquals(0L, r0.getCurrentRestartAttempt());
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class), ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class)});
        ExecutionGraph newExecutionGraph = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000L), scheduler);
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        Iterator it = newExecutionGraph.getAllExecutionVertices().iterator();
        Execution currentExecutionAttempt = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
        Execution currentExecutionAttempt2 = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
        currentExecutionAttempt.markFinished();
        currentExecutionAttempt2.fail(new Exception("Test Exception"));
        currentExecutionAttempt2.cancelingComplete();
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.MINUTES);
        waitForAsyncRestart(newExecutionGraph, finiteDuration);
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        waitForAllResourcesToBeAssignedAfterAsyncRestart(newExecutionGraph, finiteDuration.fromNow());
        for (ExecutionVertex executionVertex : newExecutionGraph.getAllExecutionVertices()) {
            Assert.assertNotNull("No assigned resource (test instability).", executionVertex.getCurrentAssignedResource());
            executionVertex.getCurrentExecutionAttempt().switchToRunning();
        }
        currentExecutionAttempt.fail(new Exception("This should have no effect"));
        Iterator it2 = newExecutionGraph.getAllExecutionVertices().iterator();
        while (it2.hasNext()) {
            ((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals(ExecutionState.FINISHED, currentExecutionAttempt.getState());
        Assert.assertEquals(JobStatus.FINISHED, newExecutionGraph.getState());
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex createJobVertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2147483647L));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{createJobVertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph newExecutionGraph = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        newExecutionGraph.cancel();
        Iterator it = newExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        }
        Assert.assertEquals(JobStatus.CANCELED, newExecutionGraph.getTerminationFuture().get());
        ((ExecutionVertex) newExecutionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.CANCELED, newExecutionGraph.getState());
    }

    @Test
    public void testFailExecutionGraphAfterCancel() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex createJobVertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2147483647L));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{createJobVertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph newExecutionGraph = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        newExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, newExecutionGraph.getState());
        newExecutionGraph.failGlobal(new Exception("Test Exception"));
        Assert.assertEquals(JobStatus.FAILING, newExecutionGraph.getState());
        ((ExecutionVertex) newExecutionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.RESTARTING, newExecutionGraph.getState());
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        Time of = Time.of(1L, TimeUnit.MINUTES);
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobVertex jobVertex = new JobVertex("Task");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(NUM_TASKS);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{jobVertex});
        ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(of);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new JobID(), "Test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), controllableRestartStrategy, scheduler);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        executionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        executionGraphTestUtils.markDead();
        controllableRestartStrategy.getReachedCanRestart().await(of.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        executionGraph.suspend(new Exception("Test exception"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        controllableRestartStrategy.unlockRestart();
        controllableRestartStrategy.getRestartDone().await(of.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
    }

    @Test
    public void testConcurrentLocalFailAndRestart() throws Exception {
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        OneShotLatch oneShotLatch = new OneShotLatch();
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), simpleAckingTaskManagerGateway, new TriggeredRestartStrategy(oneShotLatch), ExecutionGraphTestUtils.createNoOpVertex(10));
        WaitForTasks waitForTasks = new WaitForTasks(10);
        WaitForTasks waitForTasks2 = new WaitForTasks(10);
        simpleAckingTaskManagerGateway.setSubmitConsumer(waitForTasks);
        simpleAckingTaskManagerGateway.setCancelConsumer(waitForTasks2);
        createSimpleTestGraph.setScheduleMode(ScheduleMode.EAGER);
        createSimpleTestGraph.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createSimpleTestGraph);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSimpleTestGraph.getVerticesTopologically().iterator().next();
        final Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        final Execution currentExecutionAttempt2 = executionJobVertex.getTaskVertices()[executionJobVertex.getParallelism() - 1].getCurrentExecutionAttempt();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                try {
                    oneShotLatch2.await();
                } catch (InterruptedException e) {
                }
                currentExecutionAttempt.fail(new Exception("intended test failure 1"));
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                try {
                    oneShotLatch2.await();
                } catch (InterruptedException e) {
                }
                currentExecutionAttempt2.fail(new Exception("intended test failure 2"));
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        oneShotLatch2.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(createSimpleTestGraph, JobStatus.FAILING, 1000L);
        WaitForTasks waitForTasks3 = new WaitForTasks(10);
        simpleAckingTaskManagerGateway.setSubmitConsumer(waitForTasks3);
        waitForTasks2.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSimpleTestGraph);
        oneShotLatch.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(createSimpleTestGraph, JobStatus.RUNNING, 1000L);
        waitForTasks3.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createSimpleTestGraph);
        ExecutionGraphTestUtils.finishAllVertices(createSimpleTestGraph);
        createSimpleTestGraph.waitUntilTerminal();
        Assert.assertEquals(JobStatus.FINISHED, createSimpleTestGraph.getState());
    }

    @Test
    public void testConcurrentGlobalFailAndRestarts() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(10);
        NotCancelAckingTaskGateway notCancelAckingTaskGateway = new NotCancelAckingTaskGateway();
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(jobID, new SimpleSlotProvider(jobID, 10, notCancelAckingTaskGateway), new TriggeredRestartStrategy(oneShotLatch), createNoOpVertex);
        WaitForTasks waitForTasks = new WaitForTasks(10);
        notCancelAckingTaskGateway.setSubmitConsumer(waitForTasks);
        createSimpleTestGraph.setScheduleMode(ScheduleMode.EAGER);
        createSimpleTestGraph.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createSimpleTestGraph);
        createSimpleTestGraph.failGlobal(new Exception("intended test failure 1"));
        Assert.assertEquals(JobStatus.FAILING, createSimpleTestGraph.getState());
        WaitForTasks waitForTasks2 = new WaitForTasks(10);
        notCancelAckingTaskGateway.setSubmitConsumer(waitForTasks2);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSimpleTestGraph);
        ExecutionGraphTestUtils.waitUntilJobStatus(createSimpleTestGraph, JobStatus.RESTARTING, 1000L);
        createSimpleTestGraph.failGlobal(new Exception("intended test failure 2"));
        Assert.assertEquals(JobStatus.RESTARTING, createSimpleTestGraph.getState());
        oneShotLatch.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(createSimpleTestGraph, JobStatus.RUNNING, 1000L);
        waitForTasks2.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createSimpleTestGraph);
        ExecutionGraphTestUtils.finishAllVertices(createSimpleTestGraph);
        createSimpleTestGraph.waitUntilTerminal();
        Assert.assertEquals(JobStatus.FINISHED, createSimpleTestGraph.getState());
        if (createSimpleTestGraph.getNumberOfFullRestarts() > 2) {
            Assert.fail("Too many restarts: " + createSimpleTestGraph.getNumberOfFullRestarts());
        }
    }

    @Test
    public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
        Assert.assertTrue("test assumptions violated", ((ThreadPoolExecutor) this.executor).getCorePoolSize() > 1);
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        Scheduler createSchedulerWithInstances = createSchedulerWithInstances(20, simpleAckingTaskManagerGateway);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(20);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(20);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
        ExecutionGraph createExecutionGraph = ExecutionGraphTestUtils.createExecutionGraph(new JobID(), createSchedulerWithInstances, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), this.executor, jobVertex, jobVertex2);
        WaitForTasks waitForTasks = new WaitForTasks(40);
        simpleAckingTaskManagerGateway.setSubmitConsumer(waitForTasks);
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createExecutionGraph);
        ((ExecutionVertex) createExecutionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().fail(new Exception("intended test failure"));
        Assert.assertEquals(JobStatus.FAILING, createExecutionGraph.getState());
        WaitForTasks waitForTasks2 = new WaitForTasks(40);
        simpleAckingTaskManagerGateway.setSubmitConsumer(waitForTasks2);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        ExecutionGraphTestUtils.waitUntilJobStatus(createExecutionGraph, JobStatus.RUNNING, 1000L);
        waitForTasks2.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(createExecutionGraph);
        ExecutionGraphTestUtils.finishAllVertices(createExecutionGraph);
        ExecutionGraphTestUtils.waitUntilJobStatus(createExecutionGraph, JobStatus.FINISHED, 1000L);
    }

    @Test
    public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
        Assert.assertTrue("test assumptions violated", ((ThreadPoolExecutor) this.executor).getCorePoolSize() > 1);
        Scheduler createSchedulerWithInstances = createSchedulerWithInstances(19, new SimpleAckingTaskManagerGateway());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(20);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(20);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
        ExecutionGraph createExecutionGraph = ExecutionGraphTestUtils.createExecutionGraph(new JobID(), createSchedulerWithInstances, new FixedDelayRestartStrategy(10, 0L), this.executor, jobVertex, jobVertex2);
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.scheduleForExecution();
        while (createExecutionGraph.getNumberOfFullRestarts() < 10) {
            Thread.sleep(1L);
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(createExecutionGraph, JobStatus.FAILED, 1000L);
        Throwable failureCause = createExecutionGraph.getFailureCause();
        if (failureCause instanceof NoResourceAvailableException) {
            return;
        }
        ExceptionUtils.rethrowException(failureCause, failureCause.getMessage());
    }

    private Scheduler createSchedulerWithInstances(int i, TaskManagerGateway taskManagerGateway) {
        Scheduler scheduler = new Scheduler(this.executor);
        Instance[] instanceArr = new Instance[i];
        for (int i2 = 0; i2 < instanceArr.length; i2++) {
            instanceArr[i2] = createInstance(taskManagerGateway, 55443 + i2);
            scheduler.newInstanceAvailable(instanceArr[i2]);
        }
        return scheduler;
    }

    private static Instance createInstance(TaskManagerGateway taskManagerGateway, int i) {
        return new Instance(taskManagerGateway, new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), i), new InstanceID(), new HardwareDescription(4, 1000000000L, 500000000L, 400000000L), 1);
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return createExecutionGraph(restartStrategy, false);
    }

    private static Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        return createExecutionGraph(restartStrategy, true);
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy, boolean z) throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), NUM_TASKS);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class)});
        ExecutionGraph newExecutionGraph = newExecutionGraph(restartStrategy, scheduler);
        if (z) {
            newExecutionGraph = (ExecutionGraph) Mockito.spy(newExecutionGraph);
        }
        newExecutionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals(JobStatus.CREATED, newExecutionGraph.getState());
        newExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, newExecutionGraph.getState());
        return new Tuple2<>(newExecutionGraph, executionGraphTestUtils);
    }

    private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler scheduler) throws IOException {
        return new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new JobID(), "Test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), restartStrategy, scheduler);
    }

    private static void restartAfterFailure(ExecutionGraph executionGraph, FiniteDuration finiteDuration, boolean z) throws InterruptedException, TimeoutException {
        ExecutionGraphTestUtils.failExecutionGraph(executionGraph, new Exception("Test Exception"));
        waitForAsyncRestart(executionGraph, finiteDuration);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        Deadline fromNow = finiteDuration.fromNow();
        waitForAllResourcesToBeAssignedAfterAsyncRestart(executionGraph, fromNow);
        if (z) {
            if (fromNow.hasTimeLeft()) {
                haltExecution(executionGraph);
            } else {
                Assert.fail("Failed to wait until all execution attempts left the state DEPLOYING.");
            }
        }
    }

    private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph executionGraph, Deadline deadline) throws TimeoutException {
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(executionGraph, ExecutionGraphTestUtils.hasResourceAssigned, deadline.timeLeft().toMillis());
    }

    private static void waitForAsyncRestart(ExecutionGraph executionGraph, FiniteDuration finiteDuration) throws InterruptedException {
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft() && executionGraph.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
    }

    private static void haltExecution(ExecutionGraph executionGraph) {
        ExecutionGraphTestUtils.finishAllVertices(executionGraph);
        Assert.assertEquals(JobStatus.FINISHED, executionGraph.getState());
    }
}
