package org.apache.flink.runtime.executiongraph;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphNotEnoughResourceTest.class */
public class ExecutionGraphNotEnoughResourceTest extends TestLogger {
    private static TestingComponentMainThreadExecutor.Resource mainThreadExecutorResource;
    private static ComponentMainThreadExecutor mainThreadExecutor;
    private static final JobID TEST_JOB_ID = new JobID();
    private static final int NUM_TASKS = 31;

    @BeforeClass
    public static void setupClass() {
        mainThreadExecutorResource = new TestingComponentMainThreadExecutor.Resource();
        mainThreadExecutorResource.before();
        mainThreadExecutor = mainThreadExecutorResource.getComponentMainThreadTestExecutor().getMainThreadExecutor();
    }

    @AfterClass
    public static void teardownClass() {
        mainThreadExecutorResource.after();
    }

    @Test
    public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
        SlotPool slotPool = null;
        try {
            slotPool = new TestingSlotPoolImpl(TEST_JOB_ID);
            SlotProvider createSchedulerWithSlots = createSchedulerWithSlots(19, slotPool, new LocalTaskManagerLocation());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            JobVertex jobVertex = new JobVertex("source");
            jobVertex.setInvokableClass(NoOpInvokable.class);
            jobVertex.setParallelism(20);
            jobVertex.setSlotSharingGroup(slotSharingGroup);
            JobVertex jobVertex2 = new JobVertex("sink");
            jobVertex2.setInvokableClass(NoOpInvokable.class);
            jobVertex2.setParallelism(20);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
            JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job", new JobVertex[]{jobVertex, jobVertex2});
            jobGraph.setScheduleMode(ScheduleMode.EAGER);
            ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider(createSchedulerWithSlots).setRestartStrategy(new TestRestartStrategy(10, false)).setAllocationTimeout(Time.milliseconds(1L)).build();
            build.start(mainThreadExecutor);
            ComponentMainThreadExecutor componentMainThreadExecutor = mainThreadExecutor;
            build.getClass();
            componentMainThreadExecutor.execute(ThrowingRunnable.unchecked(build::scheduleForExecution));
            CommonTestUtils.waitUntilCondition(() -> {
                build.getClass();
                return Boolean.valueOf(CompletableFuture.supplyAsync(build::getState, mainThreadExecutor).join() == JobStatus.FAILED);
            }, Deadline.fromNow(Duration.ofSeconds(10L)));
            build.getClass();
            Assert.assertEquals(11L, ((Long) CompletableFuture.supplyAsync(build::getNumberOfRestarts, mainThreadExecutor).join()).longValue());
            build.getClass();
            Throwable th = (Throwable) CompletableFuture.supplyAsync(build::getFailureCause, mainThreadExecutor).join();
            if (!(th instanceof NoResourceAvailableException)) {
                ExceptionUtils.rethrowException(th, th.getMessage());
            }
            if (slotPool != null) {
                slotPool.getClass();
                CompletableFuture.runAsync(slotPool::close, mainThreadExecutor).join();
            }
        } catch (Throwable th2) {
            if (slotPool != null) {
                SlotPool slotPool2 = slotPool;
                slotPool2.getClass();
                CompletableFuture.runAsync(slotPool2::close, mainThreadExecutor).join();
            }
            throw th2;
        }
    }

    private static Scheduler createSchedulerWithSlots(int i, SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception {
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutor);
        slotPool.connectToResourceManager(testingResourceManagerGateway);
        SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        schedulerImpl.start(mainThreadExecutor);
        CompletableFuture.runAsync(() -> {
            slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        }, mainThreadExecutor).join();
        ArrayList arrayList = new ArrayList(NUM_TASKS);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
        }
        CompletableFuture.runAsync(() -> {
            slotPool.offerSlots(taskManagerLocation, simpleAckingTaskManagerGateway, arrayList);
        }, mainThreadExecutor).join();
        return schedulerImpl;
    }
}
