package org.apache.flink.runtime.executiongraph;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.class */
public class ExecutionVertexInputConstraintTest extends TestLogger {
    private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Test
    public void testInputConsumable() throws Exception {
        List<JobVertex> createOrderedVertices = createOrderedVertices();
        ExecutionGraph createExecutionGraph = createExecutionGraph(createOrderedVertices, InputDependencyConstraint.ALL);
        ExecutionVertex executionVertex = createExecutionGraph.getJobVertex(createOrderedVertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex4 = createExecutionGraph.getJobVertex(createOrderedVertices.get(2).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex5 = createExecutionGraph.getJobVertex(createOrderedVertices.get(2).getID()).getTaskVertices()[1];
        createExecutionGraph.start(this.mainThreadExecutor);
        createExecutionGraph.scheduleForExecution();
        Assert.assertFalse(executionVertex4.isInputConsumable(0));
        Assert.assertFalse(executionVertex4.isInputConsumable(1));
        executionVertex.scheduleOrUpdateConsumers(new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertTrue(executionVertex4.isInputConsumable(0));
        Assert.assertFalse(executionVertex5.isInputConsumable(0));
        executionVertex2.getCurrentExecutionAttempt().markFinished();
        Assert.assertFalse(executionVertex4.isInputConsumable(1));
        executionVertex3.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue(executionVertex4.isInputConsumable(1));
        executionVertex.fail(new Exception());
        waitUntilJobRestarted(createExecutionGraph);
        Assert.assertFalse(executionVertex4.isInputConsumable(0));
        Assert.assertFalse(executionVertex4.isInputConsumable(1));
    }

    @Test
    public void testInputConstraintANY() throws Exception {
        List<JobVertex> createOrderedVertices = createOrderedVertices();
        ExecutionGraph createExecutionGraph = createExecutionGraph(createOrderedVertices, InputDependencyConstraint.ANY);
        ExecutionVertex executionVertex = createExecutionGraph.getJobVertex(createOrderedVertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex4 = createExecutionGraph.getJobVertex(createOrderedVertices.get(2).getID()).getTaskVertices()[0];
        createExecutionGraph.start(this.mainThreadExecutor);
        createExecutionGraph.scheduleForExecution();
        Assert.assertFalse(executionVertex4.checkInputDependencyConstraints());
        executionVertex.scheduleOrUpdateConsumers(new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertTrue(executionVertex4.checkInputDependencyConstraints());
        executionVertex.fail(new Exception());
        waitUntilJobRestarted(createExecutionGraph);
        Assert.assertFalse(executionVertex4.checkInputDependencyConstraints());
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex2, ExecutionState.DEPLOYING, 2000L);
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex3, ExecutionState.DEPLOYING, 2000L);
        executionVertex2.getCurrentExecutionAttempt().markFinished();
        executionVertex3.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue(executionVertex4.checkInputDependencyConstraints());
    }

    @Test
    public void testInputConstraintALL() throws Exception {
        List<JobVertex> createOrderedVertices = createOrderedVertices();
        ExecutionGraph createExecutionGraph = createExecutionGraph(createOrderedVertices, InputDependencyConstraint.ALL);
        ExecutionVertex executionVertex = createExecutionGraph.getJobVertex(createOrderedVertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = createExecutionGraph.getJobVertex(createOrderedVertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex executionVertex4 = createExecutionGraph.getJobVertex(createOrderedVertices.get(2).getID()).getTaskVertices()[0];
        createExecutionGraph.start(this.mainThreadExecutor);
        createExecutionGraph.scheduleForExecution();
        Assert.assertFalse(executionVertex4.checkInputDependencyConstraints());
        executionVertex.scheduleOrUpdateConsumers(new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertFalse(executionVertex4.checkInputDependencyConstraints());
        executionVertex2.getCurrentExecutionAttempt().markFinished();
        executionVertex3.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue(executionVertex4.checkInputDependencyConstraints());
        executionVertex.fail(new Exception());
        waitUntilJobRestarted(createExecutionGraph);
        Assert.assertFalse(executionVertex4.checkInputDependencyConstraints());
    }

    @Test
    public void testInputConstraintALLPerformance() throws Exception {
        JobVertex createVertexWithAllInputConstraints = createVertexWithAllInputConstraints("vertex1", 1000);
        JobVertex createVertexWithAllInputConstraints2 = createVertexWithAllInputConstraints("vertex2", 1000);
        JobVertex createVertexWithAllInputConstraints3 = createVertexWithAllInputConstraints("vertex3", 1000);
        createVertexWithAllInputConstraints2.connectNewDataSetAsInput(createVertexWithAllInputConstraints, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        createVertexWithAllInputConstraints2.connectNewDataSetAsInput(createVertexWithAllInputConstraints3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ExecutionGraph createExecutionGraph = createExecutionGraph(Arrays.asList(createVertexWithAllInputConstraints, createVertexWithAllInputConstraints2, createVertexWithAllInputConstraints3), InputDependencyConstraint.ALL, 3000);
        createExecutionGraph.start(this.mainThreadExecutor);
        createExecutionGraph.scheduleForExecution();
        for (int i = 0; i < 999; i++) {
            finishSubtask(createExecutionGraph, createVertexWithAllInputConstraints.getID(), i);
        }
        long nanoTime = System.nanoTime();
        finishSubtask(createExecutionGraph, createVertexWithAllInputConstraints.getID(), 999);
        Assert.assertThat(Duration.ofNanos(System.nanoTime() - nanoTime), Matchers.lessThan(Duration.ofSeconds(5L)));
    }

    private static JobVertex createVertexWithAllInputConstraints(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex.setInputDependencyConstraint(InputDependencyConstraint.ALL);
        return jobVertex;
    }

    private static void finishSubtask(ExecutionGraph executionGraph, JobVertexID jobVertexID, int i) {
        executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), executionGraph.getJobVertex(jobVertexID).getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
    }

    private static List<JobVertex> createOrderedVertices() {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return Arrays.asList(jobVertex, jobVertex2, jobVertex3);
    }

    private static ExecutionGraph createExecutionGraph(List<JobVertex> list, InputDependencyConstraint inputDependencyConstraint) throws Exception {
        return createExecutionGraph(list, inputDependencyConstraint, 20);
    }

    private static ExecutionGraph createExecutionGraph(List<JobVertex> list, InputDependencyConstraint inputDependencyConstraint, int i) throws Exception {
        Iterator<JobVertex> it = list.iterator();
        while (it.hasNext()) {
            it.next().setInputDependencyConstraint(inputDependencyConstraint);
        }
        JobGraph jobGraph = new JobGraph((JobVertex[]) list.toArray(new JobVertex[0]));
        return TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setRestartStrategy(TestRestartStrategy.directExecuting()).setSlotProvider(new SimpleSlotProvider(i)).build();
    }

    private void waitUntilJobRestarted(ExecutionGraph executionGraph) throws Exception {
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(executionGraph, ExecutionGraphTestUtils.isInExecutionState(ExecutionState.CANCELING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.CANCELED)).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FAILED)).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED)), 2000L);
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            if (executionVertex.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) {
                executionVertex.getCurrentExecutionAttempt().completeCancelling();
            }
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(executionGraph, JobStatus.RUNNING, 2000L);
    }
}
