package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.class */
public class ExecutionGraphSchedulingTest extends TestLogger {
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest$TestRuntimeException.class */
    public static class TestRuntimeException extends RuntimeException {
        private static final long serialVersionUID = 1;

        private TestRuntimeException() {
        }
    }

    @After
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test
    public void testScheduleSourceBeforeTarget() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(1);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex, jobVertex2});
        Future<SimpleSlot> flinkCompletableFuture = new FlinkCompletableFuture<>();
        Future<SimpleSlot> flinkCompletableFuture2 = new FlinkCompletableFuture<>();
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(1);
        programmedSlotProvider.addSlot(jobVertex.getID(), 0, flinkCompletableFuture);
        programmedSlotProvider.addSlot(jobVertex2.getID(), 0, flinkCompletableFuture2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        TaskManagerGateway createTaskManager = createTaskManager();
        TaskManagerGateway createTaskManager2 = createTaskManager();
        SimpleSlot createSlot = createSlot(createTaskManager, jobID);
        SimpleSlot createSlot2 = createSlot(createTaskManager2, jobID);
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.setQueuedSchedulingAllowed(true);
        createExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        flinkCompletableFuture2.complete(createSlot2);
        ((TaskManagerGateway) Mockito.verify(createTaskManager2, new Timeout(50L, Mockito.times(0)))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        flinkCompletableFuture.complete(createSlot);
        ((TaskManagerGateway) Mockito.verify(createTaskManager, Mockito.timeout(1000L))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        ((TaskManagerGateway) Mockito.verify(createTaskManager2, Mockito.timeout(1000L))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
    }

    @Test
    public void testDeployPipelinedConnectedComponentsTogether() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(8);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(8);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex, jobVertex2});
        FlinkCompletableFuture[] flinkCompletableFutureArr = new FlinkCompletableFuture[8];
        FlinkCompletableFuture[] flinkCompletableFutureArr2 = new FlinkCompletableFuture[8];
        TaskManagerGateway[] taskManagerGatewayArr = new TaskManagerGateway[8];
        TaskManagerGateway[] taskManagerGatewayArr2 = new TaskManagerGateway[8];
        SimpleSlot[] simpleSlotArr = new SimpleSlot[8];
        SimpleSlot[] simpleSlotArr2 = new SimpleSlot[8];
        for (int i = 0; i < 8; i++) {
            taskManagerGatewayArr[i] = createTaskManager();
            taskManagerGatewayArr2[i] = createTaskManager();
            simpleSlotArr[i] = createSlot(taskManagerGatewayArr[i], jobID);
            simpleSlotArr2[i] = createSlot(taskManagerGatewayArr2[i], jobID);
            flinkCompletableFutureArr[i] = new FlinkCompletableFuture();
            flinkCompletableFutureArr2[i] = new FlinkCompletableFuture();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(8);
        programmedSlotProvider.addSlots(jobVertex.getID(), flinkCompletableFutureArr);
        programmedSlotProvider.addSlots(jobVertex2.getID(), flinkCompletableFutureArr2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        for (int i2 = 0; i2 < 8; i2 += 2) {
            flinkCompletableFutureArr[i2].complete(simpleSlotArr[i2]);
        }
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.setQueuedSchedulingAllowed(true);
        createExecutionGraph.scheduleForExecution();
        verifyNothingDeployed(createExecutionGraph, taskManagerGatewayArr);
        for (int i3 = 1; i3 < 8; i3 += 2) {
            flinkCompletableFutureArr[i3].complete(simpleSlotArr[i3]);
        }
        verifyNothingDeployed(createExecutionGraph, taskManagerGatewayArr);
        for (int i4 = 1; i4 < 8; i4++) {
            flinkCompletableFutureArr2[i4].complete(simpleSlotArr2[i4]);
        }
        verifyNothingDeployed(createExecutionGraph, taskManagerGatewayArr2);
        flinkCompletableFutureArr2[0].complete(simpleSlotArr2[0]);
        for (TaskManagerGateway taskManagerGateway : taskManagerGatewayArr) {
            ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.timeout(500L))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        }
        for (TaskManagerGateway taskManagerGateway2 : taskManagerGatewayArr2) {
            ((TaskManagerGateway) Mockito.verify(taskManagerGateway2, Mockito.timeout(500L))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        }
    }

    @Test
    public void testOneSlotFailureAbortsDeploy() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(6);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(6);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex, jobVertex2});
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        SlotOwner slotOwner = (SlotOwner) Mockito.mock(SlotOwner.class);
        SimpleSlot[] simpleSlotArr = new SimpleSlot[6];
        SimpleSlot[] simpleSlotArr2 = new SimpleSlot[6];
        FlinkCompletableFuture[] flinkCompletableFutureArr = new FlinkCompletableFuture[6];
        FlinkCompletableFuture[] flinkCompletableFutureArr2 = new FlinkCompletableFuture[6];
        for (int i = 0; i < 6; i++) {
            simpleSlotArr[i] = createSlot(taskManagerGateway, jobID, slotOwner);
            simpleSlotArr2[i] = createSlot(taskManagerGateway, jobID, slotOwner);
            flinkCompletableFutureArr[i] = new FlinkCompletableFuture();
            flinkCompletableFutureArr2[i] = new FlinkCompletableFuture();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(6);
        programmedSlotProvider.addSlots(jobVertex.getID(), flinkCompletableFutureArr);
        programmedSlotProvider.addSlots(jobVertex2.getID(), flinkCompletableFutureArr2);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider);
        for (int i2 = 0; i2 < 6; i2 += 2) {
            flinkCompletableFutureArr[i2].complete(simpleSlotArr[i2]);
            flinkCompletableFutureArr2[i2 + 1].complete(simpleSlotArr2[i2 + 1]);
        }
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.setQueuedSchedulingAllowed(true);
        createExecutionGraph.scheduleForExecution();
        flinkCompletableFutureArr[1].completeExceptionally(new TestRuntimeException());
        createExecutionGraph.getTerminationFuture().get(2000L, TimeUnit.MILLISECONDS);
        ((SlotOwner) Mockito.verify(slotOwner, new Timeout(2000L, Mockito.times(6)))).returnAllocatedSlot((Slot) Mockito.any(Slot.class));
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.times(0))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        for (int i3 = 0; i3 < 6; i3 += 2) {
            Assert.assertTrue(simpleSlotArr[i3].isCanceled());
            Assert.assertTrue(simpleSlotArr2[i3 + 1].isCanceled());
        }
    }

    @Test
    public void testTimeoutForSlotAllocation() throws Exception {
        JobVertex jobVertex = new JobVertex("task");
        jobVertex.setParallelism(3);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex});
        SlotOwner slotOwner = (SlotOwner) Mockito.mock(SlotOwner.class);
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        SimpleSlot[] simpleSlotArr = new SimpleSlot[3];
        Future[] futureArr = new FlinkCompletableFuture[3];
        for (int i = 0; i < 3; i++) {
            simpleSlotArr[i] = createSlot(taskManagerGateway, jobID, slotOwner);
            futureArr[i] = new FlinkCompletableFuture();
        }
        ProgrammedSlotProvider programmedSlotProvider = new ProgrammedSlotProvider(3);
        programmedSlotProvider.addSlots(jobVertex.getID(), futureArr);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, programmedSlotProvider, Time.milliseconds(20L));
        futureArr[1].complete(simpleSlotArr[1]);
        createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
        createExecutionGraph.setQueuedSchedulingAllowed(true);
        createExecutionGraph.scheduleForExecution();
        futureArr[2].complete(simpleSlotArr[2]);
        createExecutionGraph.getTerminationFuture().get(2000L, TimeUnit.MILLISECONDS);
        ((SlotOwner) Mockito.verify(slotOwner, new Timeout(2000L, Mockito.times(2)))).returnAllocatedSlot((Slot) Mockito.any(Slot.class));
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.times(0))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        for (Future future : futureArr) {
            if (future.isDone()) {
                Assert.assertTrue(((SimpleSlot) future.get()).isCanceled());
            }
        }
    }

    @Test
    public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setParallelism(8);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex});
        final ArrayList arrayList = new ArrayList();
        SlotOwner slotOwner = new SlotOwner() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphSchedulingTest.1
            public boolean returnAllocatedSlot(Slot slot) {
                arrayList.add((SimpleSlot) slot);
                return true;
            }
        };
        SlotProvider slotProvider = (SlotProvider) Mockito.mock(SlotProvider.class);
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        final ArrayList arrayList2 = new ArrayList(Arrays.asList(createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner)));
        Mockito.when(slotProvider.allocateSlot((ScheduledUnit) Mockito.any(ScheduledUnit.class), Mockito.anyBoolean())).then(new Answer<Future<SimpleSlot>>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphSchedulingTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Future<SimpleSlot> m57answer(InvocationOnMock invocationOnMock) {
                if (arrayList2.isEmpty()) {
                    throw new TestRuntimeException();
                }
                return FlinkCompletableFuture.completed(arrayList2.remove(0));
            }
        });
        ExecutionJobVertex jobVertex2 = createExecutionGraph(jobGraph, slotProvider).getJobVertex(jobVertex.getID());
        int size = arrayList2.size();
        try {
            jobVertex2.allocateResourcesForAll(slotProvider, false);
            Assert.fail("should have failed with an exception");
        } catch (TestRuntimeException e) {
        }
        Assert.assertEquals(size, arrayList.size());
    }

    @Test
    public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(3);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex2.setParallelism(3);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        JobGraph jobGraph = new JobGraph(jobID, "test", new JobVertex[]{jobVertex, jobVertex2});
        final ArrayList arrayList = new ArrayList();
        SlotOwner slotOwner = new SlotOwner() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphSchedulingTest.3
            public boolean returnAllocatedSlot(Slot slot) {
                arrayList.add((SimpleSlot) slot);
                return true;
            }
        };
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        final ArrayList arrayList2 = new ArrayList(Arrays.asList(createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner), createSlot(taskManagerGateway, jobID, slotOwner)));
        SlotProvider slotProvider = (SlotProvider) Mockito.mock(SlotProvider.class);
        Mockito.when(slotProvider.allocateSlot((ScheduledUnit) Mockito.any(ScheduledUnit.class), Mockito.anyBoolean())).then(new Answer<Future<SimpleSlot>>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphSchedulingTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Future<SimpleSlot> m58answer(InvocationOnMock invocationOnMock) {
                if (arrayList2.isEmpty()) {
                    throw new TestRuntimeException();
                }
                return FlinkCompletableFuture.completed(arrayList2.remove(0));
            }
        });
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, slotProvider);
        int size = arrayList2.size();
        try {
            createExecutionGraph.setScheduleMode(ScheduleMode.EAGER);
            createExecutionGraph.scheduleForExecution();
            Assert.fail("should have failed with an exception");
        } catch (TestRuntimeException e) {
        }
        Assert.assertEquals(size, arrayList.size());
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider) throws Exception {
        return createExecutionGraph(jobGraph, slotProvider, Time.minutes(10L));
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider, Time time) throws Exception {
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, jobGraph, new Configuration(), this.executor, this.executor, slotProvider, getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), time, new NoRestartStrategy(), new UnregisteredMetricsGroup(), 1, this.log);
    }

    private SimpleSlot createSlot(TaskManagerGateway taskManagerGateway, JobID jobID) {
        return createSlot(taskManagerGateway, jobID, (SlotOwner) Mockito.mock(SlotOwner.class));
    }

    private SimpleSlot createSlot(TaskManagerGateway taskManagerGateway, JobID jobID, SlotOwner slotOwner) {
        return new SimpleSlot(new AllocatedSlot(new AllocationID(), jobID, new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345), 0, ResourceProfile.UNKNOWN, taskManagerGateway), slotOwner, 0);
    }

    private static TaskManagerGateway createTaskManager() {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        Mockito.when(taskManagerGateway.submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
        return taskManagerGateway;
    }

    private static void verifyNothingDeployed(ExecutionGraph executionGraph, TaskManagerGateway[] taskManagerGatewayArr) {
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        for (TaskManagerGateway taskManagerGateway : taskManagerGatewayArr) {
            ((TaskManagerGateway) Mockito.verify(taskManagerGateway, new Timeout(50L, Mockito.times(0)))).submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class));
        }
    }
}
