package org.apache.flink.runtime.scheduler.adapter;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.class */
public class DefaultExecutionTopologyTest extends TestLogger {
    private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
    private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
    private ExecutionGraph executionGraph;
    private DefaultExecutionTopology adapter;

    @Before
    public void setUp() throws Exception {
        JobVertex[] jobVertexArr = {ExecutionGraphTestUtils.createNoOpVertex(3), ExecutionGraphTestUtils.createNoOpVertex(3)};
        jobVertexArr[1].connectNewDataSetAsInput(jobVertexArr[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertexArr[0].setInputDependencyConstraint(InputDependencyConstraint.ALL);
        jobVertexArr[1].setInputDependencyConstraint(InputDependencyConstraint.ANY);
        this.executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), this.taskManagerGateway, this.triggeredRestartStrategy, jobVertexArr);
        this.adapter = new DefaultExecutionTopology(this.executionGraph);
    }

    @Test
    public void testConstructor() {
        assertGraphEquals(this.executionGraph, this.adapter);
    }

    @Test
    public void testGetResultPartition() {
        Iterator it = this.executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : ((ExecutionVertex) it.next()).getProducedPartitions().entrySet()) {
                assertPartitionEquals((IntermediateResultPartition) entry.getValue(), (DefaultResultPartition) this.adapter.getResultPartition((IntermediateResultPartitionID) entry.getKey()).orElseThrow(() -> {
                    return new IllegalArgumentException("can not find partition " + entry.getKey());
                }));
            }
        }
    }

    @Test
    public void testResultPartitionStateSupplier() {
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition) IterableUtils.toStream(this.executionGraph.getAllExecutionVertices()).flatMap(executionVertex -> {
            return executionVertex.getProducedPartitions().values().stream();
        }).findAny().get();
        DefaultResultPartition defaultResultPartition = (DefaultResultPartition) this.adapter.getResultPartition(intermediateResultPartition.getPartitionId()).get();
        Assert.assertEquals(ResultPartitionState.CREATED, defaultResultPartition.getState());
        intermediateResultPartition.markDataProduced();
        Assert.assertEquals(ResultPartitionState.CONSUMABLE, defaultResultPartition.getState());
    }

    @Test
    public void testGetVertexOrThrow() {
        try {
            this.adapter.getVertexOrThrow(new ExecutionVertexID(new JobVertexID(), 0));
            Assert.fail("get not exist vertex");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testResultPartitionOrThrow() {
        try {
            this.adapter.getResultPartitionOrThrow(new IntermediateResultPartitionID());
            Assert.fail("get not exist result partition");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testWithCoLocationConstraints() throws Exception {
        this.adapter = new DefaultExecutionTopology(createExecutionGraphWithCoLocationConstraint());
        TestCase.assertTrue(this.adapter.containsCoLocationConstraints());
    }

    @Test
    public void testWithoutCoLocationConstraints() {
        Assert.assertFalse(this.adapter.containsCoLocationConstraints());
    }

    private ExecutionGraph createExecutionGraphWithCoLocationConstraint() throws Exception {
        JobVertex[] jobVertexArr = {ExecutionGraphTestUtils.createNoOpVertex("v1", 3), ExecutionGraphTestUtils.createNoOpVertex("v2", 3)};
        jobVertexArr[1].connectNewDataSetAsInput(jobVertexArr[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertexArr[0].setSlotSharingGroup(slotSharingGroup);
        jobVertexArr[1].setSlotSharingGroup(slotSharingGroup);
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        coLocationGroup.addVertex(jobVertexArr[0]);
        coLocationGroup.addVertex(jobVertexArr[1]);
        jobVertexArr[0].updateCoLocationGroup(coLocationGroup);
        jobVertexArr[1].updateCoLocationGroup(coLocationGroup);
        return ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), this.taskManagerGateway, this.triggeredRestartStrategy, jobVertexArr);
    }

    private static void assertGraphEquals(ExecutionGraph executionGraph, DefaultExecutionTopology defaultExecutionTopology) {
        Iterator it = defaultExecutionTopology.getVertices().iterator();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            DefaultExecutionVertex defaultExecutionVertex = (DefaultExecutionVertex) it.next();
            assertVertexEquals(executionVertex, defaultExecutionVertex);
            IntStream range = IntStream.range(0, executionVertex.getNumberOfInputs());
            executionVertex.getClass();
            assertPartitionsEquals((List) range.mapToObj(executionVertex::getInputEdges).flatMap((v0) -> {
                return Arrays.stream(v0);
            }).map((v0) -> {
                return v0.getSource();
            }).collect(Collectors.toList()), defaultExecutionVertex.getConsumedResults());
            assertPartitionsEquals(executionVertex.getProducedPartitions().values(), defaultExecutionVertex.getProducedResults());
        }
        Assert.assertFalse("Number of adapted vertices exceeds number of original vertices.", it.hasNext());
    }

    private static void assertPartitionsEquals(Iterable<IntermediateResultPartition> iterable, Iterable<DefaultResultPartition> iterable2) {
        Assert.assertEquals(Iterables.size(iterable), Iterables.size(iterable2));
        for (IntermediateResultPartition intermediateResultPartition : iterable) {
            DefaultResultPartition defaultResultPartition = (DefaultResultPartition) IterableUtils.toStream(iterable2).filter(defaultResultPartition2 -> {
                return defaultResultPartition2.getId().equals(intermediateResultPartition.getPartitionId());
            }).findAny().orElseThrow(() -> {
                return new AssertionError("Could not find matching adapted partition for " + intermediateResultPartition);
            });
            assertPartitionEquals(intermediateResultPartition, defaultResultPartition);
            List list = (List) intermediateResultPartition.getConsumers().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getTarget();
            }).collect(Collectors.toList());
            Iterable consumers = defaultResultPartition.getConsumers();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ExecutionVertexID id = ((ExecutionVertex) it.next()).getID();
                TestCase.assertTrue(IterableUtils.toStream(consumers).anyMatch(defaultExecutionVertex -> {
                    return defaultExecutionVertex.getId().equals(id);
                }));
            }
        }
    }

    private static void assertPartitionEquals(IntermediateResultPartition intermediateResultPartition, DefaultResultPartition defaultResultPartition) {
        Assert.assertEquals(intermediateResultPartition.getPartitionId(), defaultResultPartition.getId());
        Assert.assertEquals(intermediateResultPartition.getIntermediateResult().getId(), defaultResultPartition.getResultId());
        Assert.assertEquals(intermediateResultPartition.getResultType(), defaultResultPartition.getResultType());
        assertVertexEquals(intermediateResultPartition.getProducer(), defaultResultPartition.getProducer());
    }

    private static void assertVertexEquals(ExecutionVertex executionVertex, DefaultExecutionVertex defaultExecutionVertex) {
        Assert.assertEquals(executionVertex.getID(), defaultExecutionVertex.getId());
        Assert.assertEquals(executionVertex.getInputDependencyConstraint(), defaultExecutionVertex.getInputDependencyConstraint());
    }
}
