package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionTest.class */
public class ExecutionTest extends TestLogger {

    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionTest$SingleSlotTestingSlotOwner.class */
    private static final class SingleSlotTestingSlotOwner implements SlotOwner {
        final CompletableFuture<LogicalSlot> returnedSlot;

        private SingleSlotTestingSlotOwner() {
            this.returnedSlot = new CompletableFuture<>();
        }

        public CompletableFuture<LogicalSlot> getReturnedSlotFuture() {
            return this.returnedSlot;
        }

        public void returnLogicalSlot(LogicalSlot logicalSlot) {
            this.returnedSlot.complete(logicalSlot);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionTest$TestingShuffleMaster.class */
    private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return new CompletableFuture<>();
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }
    }

    @Test
    public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        CompletableFuture<LogicalSlot> completableFuture = new CompletableFuture<>();
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(id, 0, completableFuture);
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) programmedSlotProvider, (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        SingleSlotTestingSlotOwner singleSlotTestingSlotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot createTestingLogicalSlot = createTestingLogicalSlot(singleSlotTestingSlotOwner);
        TestingLogicalSlot createTestingLogicalSlot2 = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
        Assert.assertFalse(currentExecutionAttempt.allocateResourcesForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet()).isDone());
        Assert.assertEquals(ExecutionState.SCHEDULED, currentExecutionAttempt.getState());
        Assert.assertTrue(currentExecutionAttempt.tryAssignResource(createTestingLogicalSlot2));
        completableFuture.complete(createTestingLogicalSlot);
        Assert.assertEquals(createTestingLogicalSlot, singleSlotTestingSlotOwner.getReturnedSlotFuture().get());
    }

    private TestingLogicalSlot createTestingLogicalSlot(SlotOwner slotOwner) {
        return new TestingLogicalSlotBuilder().setSlotOwner(slotOwner).createTestingLogicalSlot();
    }

    @Test
    public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        SingleSlotTestingSlotOwner singleSlotTestingSlotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot createTestingLogicalSlot = createTestingLogicalSlot(singleSlotTestingSlotOwner);
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(id, 0, CompletableFuture.completedFuture(createTestingLogicalSlot));
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) programmedSlotProvider, (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        Assert.assertTrue(currentExecutionAttempt.allocateResourcesForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet()).isDone());
        Assert.assertEquals(ExecutionState.SCHEDULED, currentExecutionAttempt.getState());
        Assert.assertEquals(createTestingLogicalSlot, currentExecutionAttempt.getAssignedResource());
        currentExecutionAttempt.cancel();
        Assert.assertEquals(ExecutionState.CANCELED, currentExecutionAttempt.getState());
        Assert.assertEquals(createTestingLogicalSlot, singleSlotTestingSlotOwner.getReturnedSlotFuture().get());
    }

    @Test
    public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        SingleSlotTestingSlotOwner singleSlotTestingSlotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot createTestingLogicalSlot = createTestingLogicalSlot(singleSlotTestingSlotOwner);
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(id, 0, CompletableFuture.completedFuture(createTestingLogicalSlot));
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) programmedSlotProvider, (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        Assert.assertTrue(currentExecutionAttempt.allocateResourcesForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet()).isDone());
        Assert.assertEquals(ExecutionState.SCHEDULED, currentExecutionAttempt.getState());
        Assert.assertEquals(createTestingLogicalSlot, currentExecutionAttempt.getAssignedResource());
        currentExecutionAttempt.deploy();
        currentExecutionAttempt.switchToRunning();
        currentExecutionAttempt.cancel();
        Assert.assertEquals(ExecutionState.CANCELING, currentExecutionAttempt.getState());
        currentExecutionAttempt.completeCancelling();
        Assert.assertEquals(createTestingLogicalSlot, singleSlotTestingSlotOwner.getReturnedSlotFuture().get());
    }

    @Test
    public void testSlotAllocationCancellationWhenExecutionCancelled() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertex jobVertex = new JobVertex("test vertex", jobVertexID);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(jobVertexID, 0, new CompletableFuture<>());
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) programmedSlotProvider, (RestartStrategy) new NoRestartStrategy(), jobVertex);
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt();
        CompletableFuture allocateResourcesForExecution = currentExecutionAttempt.allocateResourcesForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        Assert.assertThat(Boolean.valueOf(allocateResourcesForExecution.isDone()), Matchers.is(false));
        Assert.assertThat(programmedSlotProvider.getSlotRequestedFuture(jobVertexID, 0).get(), Matchers.is(true));
        Set<SlotRequestId> slotRequests = programmedSlotProvider.getSlotRequests();
        Assert.assertThat(slotRequests, Matchers.hasSize(1));
        Assert.assertThat(currentExecutionAttempt.getState(), Matchers.is(ExecutionState.SCHEDULED));
        currentExecutionAttempt.cancel();
        Assert.assertThat(currentExecutionAttempt.getState(), Matchers.is(ExecutionState.CANCELED));
        Assert.assertThat(Boolean.valueOf(allocateResourcesForExecution.isCompletedExceptionally()), Matchers.is(true));
        Assert.assertThat(programmedSlotProvider.getCanceledSlotRequests(), Matchers.equalTo(slotRequests));
    }

    @Test
    public void testAllPreferredLocationCalculation() throws ExecutionException, InterruptedException {
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        LocalTaskManagerLocation localTaskManagerLocation2 = new LocalTaskManagerLocation();
        LocalTaskManagerLocation localTaskManagerLocation3 = new LocalTaskManagerLocation();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(localTaskManagerLocation);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture calculatePreferredLocations = SchedulerTestUtils.getTestVertex((Collection<CompletableFuture<TaskManagerLocation>>) Arrays.asList(completedFuture, completableFuture, completableFuture2)).calculatePreferredLocations(LocationPreferenceConstraint.ALL);
        Assert.assertFalse(calculatePreferredLocations.isDone());
        completableFuture2.complete(localTaskManagerLocation3);
        Assert.assertFalse(calculatePreferredLocations.isDone());
        completableFuture.complete(localTaskManagerLocation2);
        Assert.assertTrue(calculatePreferredLocations.isDone());
        Assert.assertThat((Collection) calculatePreferredLocations.get(), Matchers.containsInAnyOrder(new TaskManagerLocation[]{localTaskManagerLocation, localTaskManagerLocation2, localTaskManagerLocation3}));
    }

    @Test
    public void testAnyPreferredLocationCalculation() throws ExecutionException, InterruptedException {
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        LocalTaskManagerLocation localTaskManagerLocation2 = new LocalTaskManagerLocation();
        CompletableFuture calculatePreferredLocations = SchedulerTestUtils.getTestVertex((Collection<CompletableFuture<TaskManagerLocation>>) Arrays.asList(CompletableFuture.completedFuture(localTaskManagerLocation), new CompletableFuture(), CompletableFuture.completedFuture(localTaskManagerLocation2))).calculatePreferredLocations(LocationPreferenceConstraint.ANY);
        Assert.assertTrue(calculatePreferredLocations.isDone());
        Assert.assertThat((Collection) calculatePreferredLocations.get(), Matchers.containsInAnyOrder(new TaskManagerLocation[]{localTaskManagerLocation, localTaskManagerLocation2}));
    }

    @Test
    public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        SingleSlotTestingSlotOwner singleSlotTestingSlotOwner = new SingleSlotTestingSlotOwner();
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) createProgrammedSlotProvider(1, Collections.singleton(id), singleSlotTestingSlotOwner), (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionVertex executionVertex = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0];
        executionVertex.scheduleForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<LogicalSlot> returnedSlotFuture = singleSlotTestingSlotOwner.getReturnedSlotFuture();
        CompletableFuture cancel = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        cancel.thenApply(obj -> {
            Assert.assertTrue(returnedSlotFuture.isDone());
            return true;
        }).get();
    }

    @Test
    public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) createProgrammedSlotProvider(1, Collections.singleton(id), new SingleSlotTestingSlotOwner()), (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        ExecutionVertex executionVertex = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0];
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        currentExecutionAttempt.setInitialState(new JobManagerTaskRestore(1L, new TaskStateSnapshot()));
        Assert.assertThat(currentExecutionAttempt.getTaskRestore(), Matchers.is(Matchers.notNullValue()));
        executionVertex.scheduleForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
        Assert.assertThat(currentExecutionAttempt.getTaskRestore(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void testEagerSchedulingFailureReturnsSlot() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        SingleSlotTestingSlotOwner singleSlotTestingSlotOwner = new SingleSlotTestingSlotOwner();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingSlotProvider testingSlotProvider = new TestingSlotProvider(slotRequestId -> {
            completableFuture.complete(slotRequestId);
            return new CompletableFuture();
        });
        completableFuture2.getClass();
        testingSlotProvider.setSlotCanceller((v1) -> {
            r1.complete(v1);
        });
        singleSlotTestingSlotOwner.getReturnedSlotFuture().thenAccept(logicalSlot -> {
            completableFuture2.complete(logicalSlot.getSlotRequestId());
        });
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) testingSlotProvider, (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        createSimpleTestGraph.start(this.testMainThreadUtil.getMainThreadExecutor());
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        simpleAckingTaskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (currentExecutionAttempt.getAttemptId().equals(executionAttemptID)) {
                currentExecutionAttempt.completeCancelling();
            }
        });
        completableFuture.thenAcceptAsync(slotRequestId2 -> {
            testingSlotProvider.complete(slotRequestId2, ExecutionGraphSchedulingTest.createSingleLogicalSlot(singleSlotTestingSlotOwner, simpleAckingTaskManagerGateway, slotRequestId2));
        }, (Executor) this.testMainThreadUtil.getMainThreadExecutor());
        try {
            ((CompletableFuture) this.testMainThreadUtil.execute(() -> {
                return currentExecutionAttempt.scheduleForExecution(createSimpleTestGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet());
            })).get();
            TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = this.testMainThreadUtil;
            currentExecutionAttempt.getClass();
            testingComponentMainThreadExecutor.execute(currentExecutionAttempt::cancel);
        } catch (ExecutionException e) {
        }
        Assert.assertThat(completableFuture2.get(), Matchers.is(Matchers.equalTo(completableFuture.get())));
    }

    @Test
    public void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        SingleLogicalSlot createSingleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(new SingleSlotTestingSlotOwner(), new SimpleAckingTaskManagerGateway(), new SlotRequestId());
        CompletableFuture completedFuture = CompletableFuture.completedFuture(createSingleLogicalSlot);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), (SlotProvider) new TestingSlotProvider(slotRequestId -> {
            countDownLatch.countDown();
            return completedFuture;
        }), (RestartStrategy) new NoRestartStrategy(), createNoOpJobVertex);
        Execution currentExecutionAttempt = createSimpleTestGraph.getJobVertex(createNoOpJobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        createSimpleTestGraph.start(this.testMainThreadUtil.getMainThreadExecutor());
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = this.testMainThreadUtil;
        createSimpleTestGraph.getClass();
        testingComponentMainThreadExecutor.execute(createSimpleTestGraph::scheduleForExecution);
        countDownLatch.await();
        this.testMainThreadUtil.execute(() -> {
            Assert.assertThat(currentExecutionAttempt.getAssignedResource(), Matchers.is(Matchers.sameInstance(createSingleLogicalSlot)));
            createSingleLogicalSlot.release(new FlinkException("Test exception"));
            Assert.assertThat(Boolean.valueOf(currentExecutionAttempt.getReleaseFuture().isDone()), Matchers.is(true));
        });
    }

    @Test
    public void testIncompletePartitionRegistrationFutureIsRejected() throws Exception {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        JobGraph jobGraph = new JobGraph("job graph");
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobGraph.addVertex(jobVertex);
        jobGraph.addVertex(jobVertex2);
        boolean z = false;
        try {
            Execution.registerProducedPartitions(((ExecutionJobVertex) TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setShuffleMaster(testingShuffleMaster).build().getAllVertices().get(jobVertex.getID())).getTaskVertices()[0], new LocalTaskManagerLocation(), new ExecutionAttemptID(), false);
        } catch (IllegalStateException e) {
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    @Nonnull
    private ProgrammedSlotProvider createProgrammedSlotProvider(int i, Collection<JobVertexID> collection, SlotOwner slotOwner) {
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(i);
        for (JobVertexID jobVertexID : collection) {
            for (int i2 = 0; i2 < i; i2++) {
                programmedSlotProvider.addSlot(jobVertexID, 0, CompletableFuture.completedFuture(createTestingLogicalSlot(slotOwner)));
            }
        }
        return programmedSlotProvider;
    }
}
