package org.apache.flink.runtime.executiongraph.failover.adapter;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverEdge;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopologyTest.class */
public class DefaultFailoverTopologyTest extends TestLogger {
    private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
    private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();

    @Test
    public void testTopology() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph();
        assertGraphEquals(createExecutionGraph, new DefaultFailoverTopology(createExecutionGraph));
    }

    @Test
    public void testWithCollocationConstraints() throws Exception {
        Assert.assertTrue(new DefaultFailoverTopology(createExecutionGraph(true)).containsCoLocationConstraints());
    }

    @Test
    public void testWithoutCollocationConstraints() throws Exception {
        Assert.assertFalse(new DefaultFailoverTopology(createExecutionGraph(false)).containsCoLocationConstraints());
    }

    private ExecutionGraph createExecutionGraph() throws Exception {
        return createExecutionGraph(false);
    }

    private ExecutionGraph createExecutionGraph(boolean z) throws Exception {
        JobVertex[] jobVertexArr = {ExecutionGraphTestUtils.createNoOpVertex("v1", 3), ExecutionGraphTestUtils.createNoOpVertex("v2", 3), ExecutionGraphTestUtils.createNoOpVertex("v3", 3)};
        jobVertexArr[1].connectNewDataSetAsInput(jobVertexArr[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertexArr[2].connectNewDataSetAsInput(jobVertexArr[1], DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        if (z) {
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            jobVertexArr[1].setSlotSharingGroup(slotSharingGroup);
            jobVertexArr[2].setSlotSharingGroup(slotSharingGroup);
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            coLocationGroup.addVertex(jobVertexArr[1]);
            coLocationGroup.addVertex(jobVertexArr[2]);
            jobVertexArr[1].updateCoLocationGroup(coLocationGroup);
            jobVertexArr[2].updateCoLocationGroup(coLocationGroup);
        }
        return ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), this.taskManagerGateway, this.triggeredRestartStrategy, jobVertexArr);
    }

    private static void assertGraphEquals(ExecutionGraph executionGraph, FailoverTopology failoverTopology) {
        List list = (List) StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), false).collect(Collectors.toList());
        List list2 = (List) StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), false).collect(Collectors.toList());
        Assert.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            assertVertexEquals((ExecutionVertex) list.get(i), (FailoverVertex) list2.get(i));
        }
    }

    private static void assertVertexEquals(ExecutionVertex executionVertex, FailoverVertex failoverVertex) {
        Assert.assertTrue(compareVertexInternalProperties(executionVertex, failoverVertex));
        IntStream range = IntStream.range(0, executionVertex.getNumberOfInputs());
        executionVertex.getClass();
        assertEdgesEquals((List) range.mapToObj(executionVertex::getInputEdges).flatMap((v0) -> {
            return Arrays.stream(v0);
        }).collect(Collectors.toList()), (List) StreamSupport.stream(failoverVertex.getInputEdges().spliterator(), false).collect(Collectors.toList()));
        assertEdgesEquals((List) executionVertex.getProducedPartitions().values().stream().map((v0) -> {
            return v0.getConsumers();
        }).flatMap((v0) -> {
            return v0.stream();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()), (List) StreamSupport.stream(failoverVertex.getOutputEdges().spliterator(), false).collect(Collectors.toList()));
    }

    private static boolean compareVertexInternalProperties(ExecutionVertex executionVertex, FailoverVertex failoverVertex) {
        return executionVertex.getJobvertexId().equals(failoverVertex.getExecutionVertexID().getJobVertexId()) && executionVertex.getParallelSubtaskIndex() == failoverVertex.getExecutionVertexID().getSubtaskIndex() && executionVertex.getTaskNameWithSubtaskIndex().equals(failoverVertex.getExecutionVertexName());
    }

    private static void assertEdgesEquals(Collection<ExecutionEdge> collection, Collection<FailoverEdge> collection2) {
        Assert.assertEquals(collection.size(), collection2.size());
        for (ExecutionEdge executionEdge : collection) {
            Assert.assertEquals(1L, ((List) collection2.stream().filter(failoverEdge -> {
                return compareEdge(executionEdge, failoverEdge);
            }).collect(Collectors.toList())).size());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean compareEdge(ExecutionEdge executionEdge, FailoverEdge failoverEdge) {
        return executionEdge.getSource().getPartitionId().equals(failoverEdge.getResultPartitionID()) && executionEdge.getSource().getResultType().equals(failoverEdge.getResultPartitionType()) && compareVertexInternalProperties(executionEdge.getSource().getProducer(), failoverEdge.getSourceVertex()) && compareVertexInternalProperties(executionEdge.getTarget(), failoverEdge.getTargetVertex());
    }
}
