package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.class */
public class ExecutionGraphSchedulingTest extends TestLogger {
    private final ScheduledExecutorService executor = new DirectScheduledExecutorService();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest$TestRuntimeException.class */
    private static class TestRuntimeException extends RuntimeException {
        private static final long serialVersionUID = 1;

        private TestRuntimeException() {
        }
    }

    @After
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test
    public void testScheduleSourceBeforeTarget() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(1);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph(new JobID(), "test", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        CompletableFuture<LogicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture<LogicalSlot> completableFuture2 = new CompletableFuture<>();
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(jobVertex.getID(), 0, completableFuture);
        programmedSlotProvider.addSlot(jobVertex2.getID(), 0, completableFuture2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        InteractionsCountingTaskManagerGateway createTaskManager = createTaskManager();
        InteractionsCountingTaskManagerGateway createTaskManager2 = createTaskManager();
        TestingLogicalSlot createTestingLogicalSlot = createTestingLogicalSlot(createTaskManager);
        TestingLogicalSlot createTestingLogicalSlot2 = createTestingLogicalSlot(createTaskManager2);
        createExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        completableFuture2.complete(createTestingLogicalSlot2);
        Assert.assertThat(Integer.valueOf(createTaskManager2.getSubmitTaskCount()), Matchers.is(0));
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        completableFuture.complete(createTestingLogicalSlot);
        Assert.assertThat(Integer.valueOf(createTaskManager.getSubmitTaskCount()), Matchers.is(1));
        Assert.assertThat(Integer.valueOf(createTaskManager2.getSubmitTaskCount()), Matchers.is(1));
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
    }

    private TestingLogicalSlot createTestingLogicalSlot(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        return new TestingLogicalSlotBuilder().setTaskManagerGateway(interactionsCountingTaskManagerGateway).createTestingLogicalSlot();
    }

    @Test
    public void testDeployPipelinedConnectedComponentsTogether() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(8);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(8);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph(new JobID(), "test", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        CompletableFuture<LogicalSlot>[] completableFutureArr = new CompletableFuture[8];
        CompletableFuture<LogicalSlot>[] completableFutureArr2 = new CompletableFuture[8];
        InteractionsCountingTaskManagerGateway[] interactionsCountingTaskManagerGatewayArr = new InteractionsCountingTaskManagerGateway[8];
        InteractionsCountingTaskManagerGateway[] interactionsCountingTaskManagerGatewayArr2 = new InteractionsCountingTaskManagerGateway[8];
        LogicalSlot[] logicalSlotArr = new LogicalSlot[8];
        LogicalSlot[] logicalSlotArr2 = new LogicalSlot[8];
        for (int i = 0; i < 8; i++) {
            interactionsCountingTaskManagerGatewayArr[i] = createTaskManager();
            interactionsCountingTaskManagerGatewayArr2[i] = createTaskManager();
            logicalSlotArr[i] = createTestingLogicalSlot(interactionsCountingTaskManagerGatewayArr[i]);
            logicalSlotArr2[i] = createTestingLogicalSlot(interactionsCountingTaskManagerGatewayArr2[i]);
            completableFutureArr[i] = new CompletableFuture<>();
            completableFutureArr2[i] = new CompletableFuture<>();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(8);
        programmedSlotProvider.addSlots(jobVertex.getID(), completableFutureArr);
        programmedSlotProvider.addSlots(jobVertex2.getID(), completableFutureArr2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        for (int i2 = 0; i2 < 8; i2 += 2) {
            completableFutureArr[i2].complete(logicalSlotArr[i2]);
        }
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        createExecutionGraph.scheduleForExecution();
        verifyNothingDeployed(createExecutionGraph, interactionsCountingTaskManagerGatewayArr);
        for (int i3 = 1; i3 < 8; i3 += 2) {
            completableFutureArr[i3].complete(logicalSlotArr[i3]);
        }
        verifyNothingDeployed(createExecutionGraph, interactionsCountingTaskManagerGatewayArr);
        for (int i4 = 1; i4 < 8; i4++) {
            completableFutureArr2[i4].complete(logicalSlotArr2[i4]);
        }
        verifyNothingDeployed(createExecutionGraph, interactionsCountingTaskManagerGatewayArr2);
        completableFutureArr2[0].complete(logicalSlotArr2[0]);
        for (InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway : interactionsCountingTaskManagerGatewayArr) {
            Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getSubmitTaskCount()), Matchers.is(1));
        }
        for (InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway2 : interactionsCountingTaskManagerGatewayArr2) {
            Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway2.getSubmitTaskCount()), Matchers.is(1));
        }
    }

    @Test
    public void testOneSlotFailureAbortsDeploy() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(6);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(6);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph(new JobID(), "test", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        InteractionsCountingTaskManagerGateway createTaskManager = createTaskManager();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(6);
        TestingSlotOwner testingSlotOwner = new TestingSlotOwner();
        testingSlotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> {
            arrayBlockingQueue.offer(logicalSlot.getAllocationId());
        });
        LogicalSlot[] logicalSlotArr = new LogicalSlot[6];
        LogicalSlot[] logicalSlotArr2 = new LogicalSlot[6];
        CompletableFuture<LogicalSlot>[] completableFutureArr = new CompletableFuture[6];
        CompletableFuture<LogicalSlot>[] completableFutureArr2 = new CompletableFuture[6];
        for (int i = 0; i < 6; i++) {
            logicalSlotArr[i] = createSingleLogicalSlot(testingSlotOwner, createTaskManager, new SlotRequestId());
            logicalSlotArr2[i] = createSingleLogicalSlot(testingSlotOwner, createTaskManager, new SlotRequestId());
            completableFutureArr[i] = new CompletableFuture<>();
            completableFutureArr2[i] = new CompletableFuture<>();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(6);
        programmedSlotProvider.addSlots(jobVertex.getID(), completableFutureArr);
        programmedSlotProvider.addSlots(jobVertex2.getID(), completableFutureArr2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        for (int i2 = 0; i2 < 6; i2 += 2) {
            completableFutureArr[i2].complete(logicalSlotArr[i2]);
            completableFutureArr2[i2].complete(logicalSlotArr2[i2]);
        }
        createExecutionGraph.scheduleForExecution();
        completableFutureArr[1].completeExceptionally(new TestRuntimeException());
        createExecutionGraph.getTerminationFuture().get(2000L, TimeUnit.MILLISECONDS);
        for (int i3 = 0; i3 < 6; i3++) {
            arrayBlockingQueue.poll(2000L, TimeUnit.MILLISECONDS);
        }
        Assert.assertThat(Integer.valueOf(createTaskManager.getSubmitTaskCount()), Matchers.is(0));
        for (int i4 = 0; i4 < 6; i4 += 2) {
            Assert.assertFalse(logicalSlotArr[i4].isAlive());
            Assert.assertFalse(logicalSlotArr2[i4].isAlive());
        }
    }

    @Test
    public void testEagerSchedulingWithSlotTimeout() throws Exception {
        JobVertex jobVertex = new JobVertex("task");
        jobVertex.setParallelism(3);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph(new JobID(), "test", new JobVertex[]{jobVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        TestingSlotOwner testingSlotOwner = new TestingSlotOwner();
        testingSlotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> {
            arrayBlockingQueue.offer(logicalSlot.getAllocationId());
        });
        InteractionsCountingTaskManagerGateway createTaskManager = createTaskManager();
        LogicalSlot[] logicalSlotArr = new LogicalSlot[3];
        CompletableFuture<LogicalSlot>[] completableFutureArr = new CompletableFuture[3];
        for (int i = 0; i < 3; i++) {
            logicalSlotArr[i] = createSingleLogicalSlot(testingSlotOwner, createTaskManager, new SlotRequestId());
            completableFutureArr[i] = new CompletableFuture<>();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(3);
        programmedSlotProvider.addSlots(jobVertex.getID(), completableFutureArr);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        completableFutureArr[1].complete(logicalSlotArr[1]);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        createExecutionGraph.scheduleForExecution();
        completableFutureArr[2].complete(logicalSlotArr[2]);
        Assert.assertThat(Boolean.valueOf(createExecutionGraph.getTerminationFuture().isDone()), Matchers.is(false));
        completableFutureArr[0].completeExceptionally(new TimeoutException("Test time out"));
        Assert.assertThat(createExecutionGraph.getTerminationFuture().get(), Matchers.is(JobStatus.FAILED));
        for (int i2 = 0; i2 < 2; i2++) {
            arrayBlockingQueue.poll(2000L, TimeUnit.MILLISECONDS);
        }
        Assert.assertThat(Integer.valueOf(createTaskManager.getSubmitTaskCount()), Matchers.is(0));
    }

    @Test
    public void testSchedulingOperationCancellationWhenCancel() throws Exception {
        JobVertex jobVertex = new JobVertex("NoOp JobVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        CompletableFuture<LogicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture<LogicalSlot> completableFuture2 = new CompletableFuture<>();
        SlotProvider programmedSlotProvider = new ProgrammedSlotProvider(2);
        programmedSlotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{completableFuture, completableFuture2});
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        createExecutionGraph.scheduleForExecution();
        TestingLogicalSlot createTestingSlot = createTestingSlot();
        CompletableFuture<?> releaseFuture = createTestingSlot.getReleaseFuture();
        completableFuture.complete(createTestingSlot);
        createExecutionGraph.cancel();
        completableFuture2.complete(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
        Thread.sleep(1L);
        releaseFuture.complete(null);
        Assert.assertThat(createExecutionGraph.getTerminationFuture().get(), Matchers.is(JobStatus.CANCELED));
    }

    @Test
    public void testSlotReleasingFailsSchedulingOperation() throws Exception {
        JobVertex jobVertex = new JobVertex("Testing job vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(2);
        SingleLogicalSlot createSingleLogicalSlot = createSingleLogicalSlot(new DummySlotOwner(), new SimpleAckingTaskManagerGateway(), new SlotRequestId());
        programmedSlotProvider.addSlot(jobVertex.getID(), 0, CompletableFuture.completedFuture(createSingleLogicalSlot));
        programmedSlotProvider.addSlot(jobVertex.getID(), 1, new CompletableFuture<>());
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        createExecutionGraph.scheduleForExecution();
        Assert.assertThat(createExecutionGraph.getState(), Matchers.is(JobStatus.RUNNING));
        ExecutionVertex[] taskVertices = createExecutionGraph.getJobVertex(jobVertex.getID()).getTaskVertices();
        Assert.assertThat(taskVertices[0].getExecutionState(), Matchers.is(ExecutionState.SCHEDULED));
        Assert.assertThat(taskVertices[1].getExecutionState(), Matchers.is(ExecutionState.SCHEDULED));
        createSingleLogicalSlot.releaseSlot(new FlinkException("Test failure"));
        Assert.assertThat(createExecutionGraph.getTerminationFuture().get(), Matchers.is(JobStatus.FAILED));
    }

    @Test
    public void testCancellationOfIncompleteScheduling() throws Exception {
        JobVertex jobVertex = new JobVertex("Test job vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(10);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        TestingSlotOwner testingSlotOwner = new TestingSlotOwner();
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(10);
        TestingSlotProvider testingSlotProvider = new TestingSlotProvider(slotRequestId -> {
            concurrentHashMap.put(slotRequestId, 1);
            return concurrentHashMap.size() % 2 == 0 ? CompletableFuture.completedFuture(createSingleLogicalSlot(testingSlotOwner, simpleAckingTaskManagerGateway, slotRequestId)) : new CompletableFuture();
        });
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, testingSlotProvider);
        createExecutionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet(concurrentHashMap.size());
        createExecutionGraph.scheduleForExecution();
        newKeySet.addAll(concurrentHashMap.keySet());
        testingSlotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> {
            newKeySet.remove(logicalSlot.getSlotRequestId());
        });
        newKeySet.getClass();
        testingSlotProvider.setSlotCanceller((v1) -> {
            r1.remove(v1);
        });
        simpleAckingTaskManagerGateway.setCancelConsumer(executionAttemptID -> {
            Execution execution = (Execution) createExecutionGraph.getRegisteredExecutions().get(executionAttemptID);
            if (execution != null) {
                execution.completeCancelling();
            }
        });
        createExecutionGraph.cancel();
        Assert.assertThat(newKeySet, Matchers.is(Matchers.empty()));
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider) throws Exception {
        return createExecutionGraph(jobGraph, slotProvider, Time.minutes(10L));
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider, Time time) throws Exception {
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, jobGraph, new Configuration(), this.executor, this.executor, slotProvider, getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), time, new NoRestartStrategy(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), time, this.log, NettyShuffleMaster.INSTANCE, NoOpJobMasterPartitionTracker.INSTANCE, System.currentTimeMillis());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
        return new SingleLogicalSlot(slotRequestId, new SimpleSlotContext(new AllocationID(), new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345), 0, taskManagerGateway), (SlotSharingGroupId) null, Locality.LOCAL, slotOwner);
    }

    private static InteractionsCountingTaskManagerGateway createTaskManager() {
        return new InteractionsCountingTaskManagerGateway();
    }

    private static void verifyNothingDeployed(ExecutionGraph executionGraph, InteractionsCountingTaskManagerGateway[] interactionsCountingTaskManagerGatewayArr) {
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        for (InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway : interactionsCountingTaskManagerGatewayArr) {
            Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getSubmitTaskCount()), Matchers.is(0));
        }
    }

    @Nonnull
    private static TestingLogicalSlot createTestingSlot() {
        return new TestingLogicalSlotBuilder().setAutomaticallyCompleteReleaseFuture(false).createTestingLogicalSlot();
    }
}
