package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.class */
public class DefaultSchedulerBatchSchedulingTest extends TestLogger {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private static final JobID jobId = new JobID();
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest$GloballyTerminalJobStatusListener.class */
    private static class GloballyTerminalJobStatusListener implements JobStatusListener {
        private final CompletableFuture<JobStatus> globallyTerminalJobStatusFuture;

        private GloballyTerminalJobStatusListener() {
            this.globallyTerminalJobStatusFuture = new CompletableFuture<>();
        }

        public void jobStatusChanges(JobID jobID, JobStatus jobStatus, long j, Throwable th) {
            if (jobStatus.isGloballyTerminalState()) {
                this.globallyTerminalJobStatusFuture.complete(jobStatus);
            }
        }

        public CompletableFuture<JobStatus> getTerminationFuture() {
            return this.globallyTerminalJobStatusFuture;
        }
    }

    @BeforeClass
    public static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterClass
    public static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception {
        Time milliseconds = Time.milliseconds(5L);
        JobGraph createJobGraph = createJobGraph(5);
        createJobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
        SlotPoolImpl createSlotPool = createSlotPool(mainThreadExecutor, milliseconds);
        Throwable th = null;
        try {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(5);
            SlotPoolUtils.offerSlots(createSlotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY), new RpcTaskManagerGateway(new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                arrayBlockingQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway(), JobMasterId.generate()));
            SchedulerNG createScheduler = createScheduler(createJobGraph, createSlotProvider(createSlotPool, mainThreadExecutor), milliseconds);
            GloballyTerminalJobStatusListener globallyTerminalJobStatusListener = new GloballyTerminalJobStatusListener();
            createScheduler.registerJobStatusListener(globallyTerminalJobStatusListener);
            startScheduling(createScheduler, mainThreadExecutor);
            Thread.sleep(milliseconds.toMilliseconds());
            CompletableFuture<JobStatus> terminationFuture = globallyTerminalJobStatusListener.getTerminationFuture();
            for (int i = 0; i < 5; i++) {
                arrayBlockingQueue.getClass();
                CompletableFuture supplyAsync = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(arrayBlockingQueue::take));
                CompletableFuture.anyOf(supplyAsync, terminationFuture).join();
                if (supplyAsync.isDone()) {
                    finishExecution((ExecutionAttemptID) supplyAsync.get(), createScheduler, mainThreadExecutor);
                } else {
                    Assert.fail(String.format("Job reached a globally terminal state %s before all executions were finished.", terminationFuture.get()));
                }
            }
            MatcherAssert.assertThat(terminationFuture.get(), Matchers.is(JobStatus.FINISHED));
            if (createSlotPool != null) {
                if (0 == 0) {
                    createSlotPool.close();
                    return;
                }
                try {
                    createSlotPool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createSlotPool != null) {
                if (0 != 0) {
                    try {
                        createSlotPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createSlotPool.close();
                }
            }
            throw th3;
        }
    }

    private void finishExecution(ExecutionAttemptID executionAttemptID, SchedulerNG schedulerNG, ComponentMainThreadExecutor componentMainThreadExecutor) {
        CompletableFuture.runAsync(() -> {
            schedulerNG.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptID, ExecutionState.RUNNING));
            schedulerNG.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptID, ExecutionState.FINISHED));
        }, componentMainThreadExecutor).join();
    }

    @Nonnull
    private SlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor componentMainThreadExecutor) {
        SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        schedulerImpl.start(componentMainThreadExecutor);
        return schedulerImpl;
    }

    private void startScheduling(SchedulerNG schedulerNG, ComponentMainThreadExecutor componentMainThreadExecutor) {
        schedulerNG.setMainThreadExecutor(componentMainThreadExecutor);
        schedulerNG.getClass();
        CompletableFuture.runAsync(schedulerNG::startScheduling, componentMainThreadExecutor).join();
    }

    private SlotPoolImpl createSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, Time time) throws Exception {
        return new SlotPoolBuilder(componentMainThreadExecutor).setBatchSlotTimeout(time).build();
    }

    private JobGraph createJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("testing task");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(jobId, "test job", new JobVertex[]{jobVertex});
    }

    private SchedulerNG createScheduler(JobGraph jobGraph, SlotProvider slotProvider, Time time) throws Exception {
        return SchedulerTestingUtils.createScheduler(jobGraph, slotProvider, time);
    }
}
