package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.class */
public class ExecutionGraphPartitionReleaseTest extends TestLogger {
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private static final TestingComponentMainThreadExecutor mainThreadExecutor = new TestingComponentMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));

    @Test
    public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex(1);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createNoOpVertex3.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        TestingPartitionTracker testingPartitionTracker = new TestingPartitionTracker();
        ArrayDeque arrayDeque = new ArrayDeque();
        testingPartitionTracker.setStopTrackingAndReleasePartitionsConsumer(collection -> {
            arrayDeque.add(collection.iterator().next());
        });
        ExecutionGraph createExecutionGraph = createExecutionGraph(testingPartitionTracker, createNoOpVertex, createNoOpVertex2, createNoOpVertex3);
        mainThreadExecutor.execute(() -> {
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex, createExecutionGraph);
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex2, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.hasSize(1));
            MatcherAssert.assertThat(arrayDeque.remove(), IsEqual.equalTo(new ResultPartitionID((IntermediateResultPartitionID) currentExecution.getVertex().getProducedPartitions().keySet().iterator().next(), currentExecution.getAttemptId())));
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex2, createExecutionGraph);
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex3, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.hasSize(1));
            MatcherAssert.assertThat(arrayDeque.remove(), IsEqual.equalTo(new ResultPartitionID((IntermediateResultPartitionID) currentExecution.getVertex().getProducedPartitions().keySet().iterator().next(), currentExecution.getAttemptId())));
        });
    }

    @Test
    public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
        JobVertex createNoOpVertex4 = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createNoOpVertex3.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        createNoOpVertex4.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        TestingPartitionTracker testingPartitionTracker = new TestingPartitionTracker();
        ArrayDeque arrayDeque = new ArrayDeque();
        testingPartitionTracker.setStopTrackingAndReleasePartitionsConsumer(collection -> {
            arrayDeque.add(collection.iterator().next());
        });
        ExecutionGraph createExecutionGraph = createExecutionGraph(testingPartitionTracker, createNoOpVertex, createNoOpVertex2, createNoOpVertex3, createNoOpVertex4);
        mainThreadExecutor.execute(() -> {
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex2, createExecutionGraph);
            Iterator it = currentExecution.getVertex().getProducedPartitions().keySet().iterator();
            while (it.hasNext()) {
                createExecutionGraph.scheduleOrUpdateConsumers(new ResultPartitionID((IntermediateResultPartitionID) it.next(), currentExecution.getAttemptId()));
            }
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), currentExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex3, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            getCurrentExecution(createNoOpVertex3, createExecutionGraph).getVertex().resetForNewExecution(0L, 1L);
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            createExecutionGraph.updateState(new TaskExecutionState(createExecutionGraph.getJobID(), getCurrentExecution(createNoOpVertex4, createExecutionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
    }

    private static Execution getCurrentExecution(JobVertex jobVertex, ExecutionGraph executionGraph) {
        return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
    }

    private ExecutionGraph createExecutionGraph(PartitionTracker partitionTracker, JobVertex... jobVertexArr) throws Exception {
        ExecutionGraph buildGraph = ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, new JobGraph(new JobID(), "test job", jobVertexArr), new Configuration(), scheduledExecutorService, mainThreadExecutor.getMainThreadExecutor(), new TestingSlotProvider(slotRequestId -> {
            return CompletableFuture.completedFuture(new TestingLogicalSlot());
        }), ExecutionGraphPartitionReleaseTest.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), this.log, NettyShuffleMaster.INSTANCE, partitionTracker);
        buildGraph.start(mainThreadExecutor.getMainThreadExecutor());
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = mainThreadExecutor;
        buildGraph.getClass();
        testingComponentMainThreadExecutor.execute(buildGraph::scheduleForExecution);
        return buildGraph;
    }
}
