package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/FailoverRegionTest.class */
public class FailoverRegionTest extends TestLogger {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/FailoverRegionTest$FailoverPipelinedRegionWithDirectExecutor.class */
    public static class FailoverPipelinedRegionWithDirectExecutor implements FailoverStrategy.Factory {
        private FailoverPipelinedRegionWithDirectExecutor() {
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor());
        }
    }

    @Test
    public void testSingleRegionFailover() throws Exception {
        ExecutionGraph createSingleRegionExecutionGraph = createSingleRegionExecutionGraph(new InfiniteDelayRestartStrategy(10));
        RestartPipelinedRegionStrategy failoverStrategy = createSingleRegionExecutionGraph.getFailoverStrategy();
        ExecutionVertex executionVertex = (ExecutionVertex) createSingleRegionExecutionGraph.getAllExecutionVertices().iterator().next();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Iterator it = createSingleRegionExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
    }

    @Test
    public void testMultiRegionsFailover() throws Exception {
        JobID jobID = new JobID();
        SimpleSlotProvider simpleSlotProvider = new SimpleSlotProvider(jobID, 20);
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(1);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        List asList = Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4);
        ExecutionGraph executionGraph = new ExecutionGraph(new DummyJobInformation(jobID, "Test Job Sample Name"), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), new InfiniteDelayRestartStrategy(10), new FailoverPipelinedRegionWithDirectExecutor(), simpleSlotProvider);
        executionGraph.attachJobGraph(asList);
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionVertex executionVertex = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex4 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex5 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex6 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex7 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0];
        executionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex2.scheduleForExecution(simpleSlotProvider, true, LocationPreferenceConstraint.ALL);
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("New fail"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex4).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex5).getState());
        executionVertex.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex4).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex5).getState());
        executionVertex.getCurrentExecutionAttempt().markFinished();
        executionVertex2.getCurrentExecutionAttempt().markFinished();
        executionVertex4.scheduleForExecution(simpleSlotProvider, true, LocationPreferenceConstraint.ALL);
        executionVertex4.getCurrentExecutionAttempt().markFinished();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex4).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex5).getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex5.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex6.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000L);
        executionVertex5.getCurrentExecutionAttempt().fail(new Exception("New fail"));
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex4).getState());
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex5).getState());
        executionVertex6.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex4).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex5).getState());
    }

    @Test
    public void testNoManualRestart() throws Exception {
        ExecutionGraph createSingleRegionExecutionGraph = createSingleRegionExecutionGraph(new NoRestartStrategy());
        ((ExecutionVertex) createSingleRegionExecutionGraph.getAllExecutionVertices().iterator().next()).fail(new Exception("Test Exception"));
        Iterator it = createSingleRegionExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.FAILED, createSingleRegionExecutionGraph.getState());
    }

    @Test
    public void testMultiRegionFailoverAtSameTime() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 16);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        List asList = Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4);
        ExecutionGraph executionGraph = new ExecutionGraph(new DummyJobInformation(jobID, "Test Job Sample Name"), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), new InfiniteDelayRestartStrategy(10), new RestartPipelinedRegionStrategy.Factory(), scheduler);
        try {
            executionGraph.attachJobGraph(asList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        executionGraph.scheduleForExecution();
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionVertex executionVertex = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex3 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex4 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[1];
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex3).getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("new fail"));
        executionVertex3.getCurrentExecutionAttempt().fail(new Exception("new fail"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex3).getState());
        executionVertex4.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilFailoverRegionState(failoverStrategy.getFailoverRegion(executionVertex3), JobStatus.RUNNING, 1000L);
        executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilFailoverRegionState(failoverStrategy.getFailoverRegion(executionVertex), JobStatus.RUNNING, 1000L);
    }

    @Test
    @Ignore
    public void testSucceedingNoticePreceding() throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 14);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex.setParallelism(1);
        jobVertex2.setParallelism(1);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph executionGraph = new ExecutionGraph(new DummyJobInformation(jobID, "Test Job Sample Name"), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), new InfiniteDelayRestartStrategy(10), new FailoverPipelinedRegionWithDirectExecutor(), scheduler);
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        executionGraph.setScheduleMode(ScheduleMode.EAGER);
        executionGraph.scheduleForExecution();
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionVertex executionVertex = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("Fail with v1"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex2).getState());
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
    }

    @Test
    public void testFailWhileCancelling() throws Exception {
        ExecutionGraph createSingleRegionExecutionGraph = createSingleRegionExecutionGraph(new InfiniteDelayRestartStrategy());
        RestartPipelinedRegionStrategy failoverStrategy = createSingleRegionExecutionGraph.getFailoverStrategy();
        Iterator it = createSingleRegionExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        executionVertex.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("new fail"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().fail(new Exception("new fail"));
        Assert.assertEquals(JobStatus.RUNNING, createSingleRegionExecutionGraph.getState());
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        ExecutionGraph createSingleRegionExecutionGraph = createSingleRegionExecutionGraph(new InfiniteDelayRestartStrategy());
        RestartPipelinedRegionStrategy failoverStrategy = createSingleRegionExecutionGraph.getFailoverStrategy();
        ExecutionVertex executionVertex = (ExecutionVertex) createSingleRegionExecutionGraph.getAllExecutionVertices().iterator().next();
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("new fail"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        Iterator it = createSingleRegionExecutionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals(JobStatus.RUNNING, failoverStrategy.getFailoverRegion(executionVertex).getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("new fail"));
        Assert.assertEquals(JobStatus.CANCELLING, failoverStrategy.getFailoverRegion(executionVertex).getState());
    }

    private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        Instance executionGraphTestUtils = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())), 14);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(executionGraphTestUtils);
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(3);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph executionGraph = new ExecutionGraph(new DummyJobInformation(jobID, "Test Job Sample Name"), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), restartStrategy, new FailoverPipelinedRegionWithDirectExecutor(), scheduler);
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        executionGraph.scheduleForExecution();
        return executionGraph;
    }
}
