package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SchedulerImplTest.class */
public class SchedulerImplTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(1);

    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource(10);
    private final TestingComponentMainThreadExecutor testMainThreadExecutor = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
    private TaskManagerLocation taskManagerLocation;
    private SimpleAckingTaskManagerGateway taskManagerGateway;
    private TestingResourceManagerGateway resourceManagerGateway;
    private TestingSlotPoolImpl slotPool;

    @Before
    public void setUp() throws Exception {
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.resourceManagerGateway = new TestingResourceManagerGateway();
        this.slotPool = createAndSetUpSlotPool();
    }

    @After
    public void teardown() throws Exception {
        this.testMainThreadExecutor.execute(() -> {
            this.slotPool.close();
        });
    }

    @Test
    public void testAllocateSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = this.resourceManagerGateway;
        completableFuture.getClass();
        testingResourceManagerGateway.setRequestSlotConsumer((v1) -> {
            r1.complete(v1);
        });
        CompletableFuture<LogicalSlot> allocateSlot = allocateSlot(createAndSetUpScheduler(this.slotPool));
        Assert.assertFalse(allocateSlot.isDone());
        SlotRequest slotRequest = (SlotRequest) completableFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.testMainThreadExecutor.execute(() -> {
            return Boolean.valueOf(this.slotPool.registerTaskManager(this.taskManagerLocation.getResourceID()));
        });
        SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.ANY);
        this.testMainThreadExecutor.execute(() -> {
            return Boolean.valueOf(this.slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer));
        });
        LogicalSlot logicalSlot = allocateSlot.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue(allocateSlot.isDone());
        Assert.assertTrue(logicalSlot.isAlive());
        Assert.assertEquals(this.taskManagerLocation, logicalSlot.getTaskManagerLocation());
    }

    @Test
    public void testProviderAndOwnerSlotAllocationTimeout() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingSlotPoolImpl testingSlotPoolImpl = this.slotPool;
        completableFuture.getClass();
        testingSlotPoolImpl.setReleaseSlotConsumer((v1) -> {
            r1.complete(v1);
        });
        try {
            allocateSlot(createAndSetUpScheduler(this.slotPool)).get();
            Assert.fail("We expected a TimeoutException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
        }
        completableFuture.get();
        Assert.assertEquals(0L, this.slotPool.getNumberOfPendingRequests());
    }

    private Scheduler createAndSetUpScheduler(SlotPool slotPool) {
        SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
        return schedulerImpl;
    }

    private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
        return new SlotPoolBuilder(this.testMainThreadExecutor.getMainThreadExecutor()).setResourceManagerGateway(this.resourceManagerGateway).build();
    }

    private CompletableFuture<LogicalSlot> allocateSlot(Scheduler scheduler) {
        return (CompletableFuture) this.testMainThreadExecutor.execute(() -> {
            return scheduler.allocateSlot(new DummyScheduledUnit(), SlotProfile.noRequirements(), TIMEOUT);
        });
    }
}
