package org.apache.flink.runtime.executiongraph.failover;

import java.util.Iterator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.class */
public class PipelinedFailoverRegionBuildingTest extends TestLogger {
    @Test
    public void testIndividualVertices() throws Exception {
        JobVertex jobVertex = new JobVertex("source1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobVertex jobVertex2 = new JobVertex("source2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1]);
        Assert.assertTrue(failoverRegion != failoverRegion2);
        Assert.assertTrue(failoverRegion2 != failoverRegion3);
        Assert.assertTrue(failoverRegion3 != failoverRegion4);
    }

    @Test
    public void testEmbarrassinglyParallelCase() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(10000);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(10000);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(10000);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion2 == failoverRegion3);
        for (int i = 1; i < 10000; i++) {
            FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[i]);
            FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[i]);
            FailoverRegion failoverRegion6 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[i]);
            Assert.assertTrue(failoverRegion4 == failoverRegion5);
            Assert.assertTrue(failoverRegion5 == failoverRegion6);
            Assert.assertTrue(failoverRegion != failoverRegion4);
        }
    }

    @Test
    public void testOneComponentViaTwoExchanges() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(3);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(5);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[4]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion2 == failoverRegion3);
    }

    @Test
    public void testOneComponentViaCascadeOfJoins() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(8);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(8);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(8);
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex4.setInvokableClass(NoOpInvokable.class);
        jobVertex4.setParallelism(8);
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex5.setInvokableClass(NoOpInvokable.class);
        jobVertex5.setParallelism(4);
        JobVertex jobVertex6 = new JobVertex("vertex6");
        jobVertex6.setInvokableClass(NoOpInvokable.class);
        jobVertex6.setParallelism(4);
        JobVertex jobVertex7 = new JobVertex("vertex7");
        jobVertex7.setInvokableClass(NoOpInvokable.class);
        jobVertex7.setParallelism(2);
        jobVertex5.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex6.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex6.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex7.connectNewDataSetAsInput(jobVertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex7.connectNewDataSetAsInput(jobVertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5, jobVertex6, jobVertex7}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion((ExecutionVertex) it.next());
        while (it.hasNext()) {
            Assert.assertTrue(failoverRegion == failoverStrategy.getFailoverRegion((ExecutionVertex) it.next()));
        }
    }

    @Test
    public void testOneComponentInstanceFromOneSource() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(8);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(8);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(8);
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex4.setInvokableClass(NoOpInvokable.class);
        jobVertex4.setParallelism(8);
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex5.setInvokableClass(NoOpInvokable.class);
        jobVertex5.setParallelism(4);
        JobVertex jobVertex6 = new JobVertex("vertex6");
        jobVertex6.setInvokableClass(NoOpInvokable.class);
        jobVertex6.setParallelism(4);
        JobVertex jobVertex7 = new JobVertex("vertex7");
        jobVertex7.setInvokableClass(NoOpInvokable.class);
        jobVertex7.setParallelism(2);
        jobVertex.connectNewDataSetAsInput(jobVertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex2.connectNewDataSetAsInput(jobVertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex6.connectNewDataSetAsInput(jobVertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex7, jobVertex5, jobVertex6, jobVertex, jobVertex2, jobVertex3, jobVertex4}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion((ExecutionVertex) it.next());
        while (it.hasNext()) {
            Assert.assertTrue(failoverRegion == failoverStrategy.getFailoverRegion((ExecutionVertex) it.next()));
        }
    }

    @Test
    public void testTwoComponentsViaBlockingExchange() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(3);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(2);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[1]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion2 != failoverRegion3);
        Assert.assertTrue(failoverRegion4 != failoverRegion3);
    }

    @Test
    public void testTwoComponentsViaBlockingExchange2() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(3);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(2);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[1]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion2 != failoverRegion3);
        Assert.assertTrue(failoverRegion4 != failoverRegion3);
    }

    @Test
    public void testMultipleComponentsViaCascadeOfJoins() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(8);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(8);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(8);
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex4.setInvokableClass(NoOpInvokable.class);
        jobVertex4.setParallelism(8);
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex5.setInvokableClass(NoOpInvokable.class);
        jobVertex5.setParallelism(4);
        JobVertex jobVertex6 = new JobVertex("vertex6");
        jobVertex6.setInvokableClass(NoOpInvokable.class);
        jobVertex6.setParallelism(4);
        JobVertex jobVertex7 = new JobVertex("vertex7");
        jobVertex7.setInvokableClass(NoOpInvokable.class);
        jobVertex7.setParallelism(2);
        jobVertex5.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex6.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex6.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex7.connectNewDataSetAsInput(jobVertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex7.connectNewDataSetAsInput(jobVertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5, jobVertex6, jobVertex7}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[5]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex5.getID()).getTaskVertices()[2]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion == failoverRegion3);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex4.getID()).getTaskVertices()[5]);
        FailoverRegion failoverRegion6 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex6.getID()).getTaskVertices()[2]);
        Assert.assertTrue(failoverRegion4 == failoverRegion5);
        Assert.assertTrue(failoverRegion4 == failoverRegion6);
        FailoverRegion failoverRegion7 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex7.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion8 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex7.getID()).getTaskVertices()[1]);
        Assert.assertTrue(failoverRegion7 != failoverRegion8);
        Assert.assertTrue(failoverRegion != failoverRegion7);
        Assert.assertTrue(failoverRegion != failoverRegion8);
        Assert.assertTrue(failoverRegion4 != failoverRegion7);
        Assert.assertTrue(failoverRegion4 != failoverRegion8);
    }

    @Test
    public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(8);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(8);
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(8);
        JobVertex jobVertex4 = new JobVertex("vertex4");
        jobVertex4.setInvokableClass(NoOpInvokable.class);
        jobVertex4.setParallelism(8);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3, jobVertex4}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion((ExecutionVertex) it.next());
        while (it.hasNext()) {
            Assert.assertTrue(failoverRegion == failoverStrategy.getFailoverRegion((ExecutionVertex) it.next()));
        }
    }

    @Test
    public void testBlockingAllToAllTopologyWithCoLocation() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(10);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(13);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex.setStrictlyCoLocatedWith(jobVertex2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        Assert.assertTrue(failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]) == failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]));
    }

    @Test
    public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(10);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(10);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex.setStrictlyCoLocatedWith(jobVertex2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(new JobGraph("test job", new JobVertex[]{jobVertex, jobVertex2}));
        RestartPipelinedRegionStrategy failoverStrategy = createExecutionGraph.getFailoverStrategy();
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[1]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(createExecutionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1]);
        Assert.assertTrue(failoverRegion == failoverRegion2);
        Assert.assertTrue(failoverRegion2 == failoverRegion3);
        Assert.assertTrue(failoverRegion3 == failoverRegion4);
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobException, JobExecutionException {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        Time seconds = Time.seconds(10L);
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, jobGraph, configuration, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), (SlotProvider) Mockito.mock(SlotProvider.class), PipelinedFailoverRegionBuildingTest.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), seconds, new NoRestartStrategy(), new UnregisteredMetricsGroup(), 1000, VoidBlobWriter.getInstance(), seconds, this.log);
    }
}
