package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestFailoverTopology;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.class */
public class RestartPipelinedRegionStrategyTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest$TestResultPartitionAvailabilityChecker.class */
    private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
        private final HashSet<IntermediateResultPartitionID> failedPartitions = new HashSet<>();

        public boolean isAvailable(IntermediateResultPartitionID intermediateResultPartitionID) {
            return !this.failedPartitions.contains(intermediateResultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.add(intermediateResultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.remove(intermediateResultPartitionID);
        }
    }

    @Test
    public void testRegionFailoverForRegionInternalErrors() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        hashSet.add(newVertex.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex2.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex3.getExecutionVertexID());
        hashSet.add(newVertex6.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex4.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex6.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex6.getExecutionVertexID(), new Exception("Test failure")));
    }

    @Test
    public void testRegionFailoverForDataConsumptionErrors() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        Iterator<? extends FailoverEdge> it = newVertex4.getInputEdges().iterator();
        hashSet.add(newVertex.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.getExecutionVertexID(), new PartitionConnectionException(new ResultPartitionID(it.next().getResultPartitionID(), new ExecutionAttemptID()), new Exception("Test failure"))));
        hashSet.clear();
        hashSet.add(newVertex2.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.getExecutionVertexID(), new PartitionNotFoundException(new ResultPartitionID(it.next().getResultPartitionID(), new ExecutionAttemptID()))));
        hashSet.clear();
        Iterator<? extends FailoverEdge> it2 = newVertex5.getInputEdges().iterator();
        hashSet.add(newVertex.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.getExecutionVertexID(), new PartitionConnectionException(new ResultPartitionID(it2.next().getResultPartitionID(), new ExecutionAttemptID()), new Exception("Test failure"))));
        hashSet.clear();
        hashSet.add(newVertex2.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.getExecutionVertexID(), new PartitionNotFoundException(new ResultPartitionID(it2.next().getResultPartitionID(), new ExecutionAttemptID()))));
        hashSet.clear();
        Iterator<? extends FailoverEdge> it3 = newVertex6.getInputEdges().iterator();
        hashSet.add(newVertex3.getExecutionVertexID());
        hashSet.add(newVertex6.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex6.getExecutionVertexID(), new PartitionConnectionException(new ResultPartitionID(it3.next().getResultPartitionID(), new ExecutionAttemptID()), new Exception("Test failure"))));
    }

    @Test
    public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        builder.connect(newVertex, newVertex3, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex3, ResultPartitionType.BLOCKING);
        FailoverTopology build = builder.build();
        TestResultPartitionAvailabilityChecker testResultPartitionAvailabilityChecker = new TestResultPartitionAvailabilityChecker();
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(build, testResultPartitionAvailabilityChecker);
        IntermediateResultPartitionID resultPartitionID = newVertex.getOutputEdges().iterator().next().getResultPartitionID();
        IntermediateResultPartitionID resultPartitionID2 = newVertex2.getOutputEdges().iterator().next().getResultPartitionID();
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex3.getExecutionVertexID()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(resultPartitionID);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(resultPartitionID2);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(resultPartitionID);
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(resultPartitionID2);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.getExecutionVertexID(), newVertex2.getExecutionVertexID(), newVertex3.getExecutionVertexID()}));
    }

    @Test
    public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex2, ResultPartitionType.PIPELINED);
        builder.connect(newVertex2, newVertex3, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex4, ResultPartitionType.PIPELINED);
        builder.connect(newVertex4, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex5, newVertex6, ResultPartitionType.PIPELINED);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        hashSet.add(newVertex3.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        hashSet.add(newVertex6.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex.getExecutionVertexID());
        hashSet.add(newVertex2.getExecutionVertexID());
        hashSet.add(newVertex3.getExecutionVertexID());
        hashSet.add(newVertex4.getExecutionVertexID());
        hashSet.add(newVertex5.getExecutionVertexID());
        hashSet.add(newVertex6.getExecutionVertexID());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.getExecutionVertexID(), new PartitionConnectionException(new ResultPartitionID(newVertex3.getInputEdges().iterator().next().getResultPartitionID(), new ExecutionAttemptID()), new Exception("Test failure"))));
    }
}
