package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.class */
public class ExecutionPartitionLifecycleTest extends TestLogger {

    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private Execution execution;
    private ResultPartitionDeploymentDescriptor descriptor;
    private ResourceID taskExecutorResourceId;
    private JobID jobId;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest$PartitionReleaseResult.class */
    private enum PartitionReleaseResult {
        NONE,
        STOP_TRACKING,
        STOP_TRACKING_AND_RELEASE
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest$SingleSlotTestingSlotOwner.class */
    private static final class SingleSlotTestingSlotOwner implements SlotOwner {
        final CompletableFuture<LogicalSlot> returnedSlot;

        private SingleSlotTestingSlotOwner() {
            this.returnedSlot = new CompletableFuture<>();
        }

        public void returnLogicalSlot(LogicalSlot logicalSlot) {
            this.returnedSlot.complete(logicalSlot);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest$TestingShuffleMaster.class */
    public static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ShuffleDescriptor> externallyReleasedPartitions;

        private TestingShuffleMaster() {
            this.externallyReleasedPartitions = new ArrayBlockingQueue(4);
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(final PartitionDescriptor partitionDescriptor, final ProducerDescriptor producerDescriptor) {
            return CompletableFuture.completedFuture(new ShuffleDescriptor() { // from class: org.apache.flink.runtime.executiongraph.ExecutionPartitionLifecycleTest.TestingShuffleMaster.1
                public ResultPartitionID getResultPartitionID() {
                    return new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
                }

                public Optional<ResourceID> storesLocalResourcesOn() {
                    return Optional.of(producerDescriptor.getProducerLocation());
                }
            });
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor);
        }
    }

    @Test
    public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
        testPartitionReleaseOnStateTransitionsAfterRunning((v0) -> {
            v0.cancel();
        }, (v0) -> {
            v0.markFinished();
        });
    }

    @Test
    public void testPartitionReleaseOnCancelWhileFinished() throws Exception {
        testPartitionReleaseOnStateTransitionsAfterRunning((v0) -> {
            v0.markFinished();
        }, (v0) -> {
            v0.cancel();
        });
    }

    @Test
    public void testPartitionReleaseOnSuspendWhileFinished() throws Exception {
        testPartitionReleaseOnStateTransitionsAfterRunning((v0) -> {
            v0.markFinished();
        }, (v0) -> {
            v0.suspend();
        });
    }

    private void testPartitionReleaseOnStateTransitionsAfterRunning(Consumer<Execution> consumer, Consumer<Execution> consumer2) throws Exception {
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        simpleAckingTaskManagerGateway.setReleasePartitionsConsumer((jobID, collection) -> {
            completableFuture.complete(Tuple2.of(jobID, collection));
        });
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, simpleAckingTaskManagerGateway, testingShuffleMaster);
        consumer.accept(this.execution);
        Assert.assertFalse(completableFuture.isDone());
        consumer2.accept(this.execution);
        Assert.assertTrue(completableFuture.isDone());
        Tuple2 tuple2 = (Tuple2) completableFuture.get();
        Assert.assertEquals(this.jobId, tuple2.f0);
        Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), tuple2.f1);
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(this.descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll());
    }

    @Test
    public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
        testPartitionTrackingForStateTransition((v0) -> {
            v0.markFinished();
        }, PartitionReleaseResult.NONE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
        testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L), false);
        }, PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
        testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling();
        }, PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
        testPartitionTrackingForStateTransition(execution -> {
            execution.markFailed(new Exception("Test exception"), Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L));
        }, PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
        testPartitionTrackingForStateTransition(execution -> {
            execution.markFailed(new Exception("Test exception"));
        }, PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    private void testPartitionTrackingForStateTransition(Consumer<Execution> consumer, PartitionReleaseResult partitionReleaseResult) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingPartitionTracker testingPartitionTracker = new TestingPartitionTracker();
        testingPartitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> {
            completableFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor));
        });
        completableFuture2.getClass();
        testingPartitionTracker.setStopTrackingPartitionsConsumer((v1) -> {
            r1.complete(v1);
        });
        completableFuture3.getClass();
        testingPartitionTracker.setStopTrackingAndReleasePartitionsConsumer((v1) -> {
            r1.complete(v1);
        });
        setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, testingPartitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE);
        Tuple2 tuple2 = (Tuple2) completableFuture.get();
        Assert.assertThat(tuple2.f0, Matchers.equalTo(this.taskExecutorResourceId));
        Assert.assertThat(tuple2.f1, Matchers.equalTo(this.descriptor));
        consumer.accept(this.execution);
        switch (partitionReleaseResult) {
            case NONE:
                Assert.assertFalse(completableFuture2.isDone());
                Assert.assertFalse(completableFuture3.isDone());
                return;
            case STOP_TRACKING:
                Assert.assertTrue(completableFuture2.isDone());
                Assert.assertFalse(completableFuture3.isDone());
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Collection) completableFuture2.get());
                return;
            case STOP_TRACKING_AND_RELEASE:
                Assert.assertFalse(completableFuture2.isDone());
                Assert.assertTrue(completableFuture3.isDone());
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Collection) completableFuture3.get());
                return;
            default:
                return;
        }
    }

    private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, final TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertex createNoOpJobVertex2 = createNoOpJobVertex();
        createNoOpJobVertex2.connectNewDataSetAsInput(createNoOpJobVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        ExecutionGraph buildGraph = ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, new JobGraph(new JobID(), "test job", new JobVertex[]{createNoOpJobVertex, createNoOpJobVertex2}), new Configuration(), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new SlotProvider() { // from class: org.apache.flink.runtime.executiongraph.ExecutionPartitionLifecycleTest.1
            public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
                return CompletableFuture.completedFuture(new SimpleSlot(new SingleSlotTestingSlotOwner(), localTaskManagerLocation, 0, taskManagerGateway));
            }

            public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            }
        }, ExecutionPartitionLifecycleTest.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), Time.seconds(10L), new NoRestartStrategy(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), Time.seconds(10L), this.log, shuffleMaster, partitionTracker);
        buildGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex jobVertex = buildGraph.getJobVertex(createNoOpJobVertex.getID());
        this.execution = jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        this.execution.allocateResourcesForExecution(buildGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        this.execution.deploy();
        this.execution.switchToRunning();
        this.descriptor = (ResultPartitionDeploymentDescriptor) this.execution.getResultPartitionDeploymentDescriptor(jobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId()).get();
        this.taskExecutorResourceId = localTaskManagerLocation.getResourceID();
        this.jobId = buildGraph.getJobID();
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}
