package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.class */
public class SlotPoolSlotSharingTest extends TestLogger {

    @Rule
    public final SlotPoolResource slotPoolResource = new SlotPoolResource(PreviousAllocationSlotSelectionStrategy.create());

    @Test
    public void testSingleQueuedSharedSlotScheduling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        CompletableFuture allocateSlot = this.slotPoolResource.getSlotProvider().allocateSlot(new ScheduledUnit(new JobVertexID(), slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot.isDone());
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer((AllocationID) completableFuture.get(), 0, ResourceProfile.UNKNOWN)));
        Assert.assertEquals(slotSharingGroupId, ((LogicalSlot) allocateSlot.get()).getSlotSharingGroupId());
    }

    @Test
    public void testFailingQueuedSharedSlotScheduling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        CompletableFuture allocateSlot = this.slotPoolResource.getSlotProvider().allocateSlot(new ScheduledUnit(new JobVertexID(), new SlotSharingGroupId(), (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        this.slotPoolResource.getSlotPool().failAllocation((AllocationID) completableFuture.get(), new FlinkException("Testing Exception"));
        try {
            allocateSlot.get();
            Assert.fail("The slot future should have failed.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, FlinkException.class).isPresent());
        }
    }

    @Test
    public void testQueuedSharedSlotScheduling() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot.isDone());
        Assert.assertFalse(allocateSlot2.isDone());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot4 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot3.isDone());
        Assert.assertFalse(allocateSlot4.isDone());
        arrayBlockingQueue.take();
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN)));
        LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
        Assert.assertEquals(allocationID, logicalSlot.getAllocationId());
        Assert.assertEquals(allocationID, logicalSlot2.getAllocationId());
        Assert.assertFalse(allocateSlot3.isDone());
        Assert.assertFalse(allocateSlot4.isDone());
        logicalSlot.releaseSlot((Throwable) null);
        logicalSlot2.releaseSlot((Throwable) null);
        LogicalSlot logicalSlot3 = (LogicalSlot) allocateSlot3.get();
        LogicalSlot logicalSlot4 = (LogicalSlot) allocateSlot4.get();
        Assert.assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals(allocationID, logicalSlot3.getAllocationId());
        Assert.assertEquals(allocationID, logicalSlot4.getAllocationId());
    }

    @Test
    public void testQueuedMultipleSlotSharingGroups() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID3, slotSharingGroupId2, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot4 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID4, slotSharingGroupId2, (CoLocationConstraint) null), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot.isDone());
        Assert.assertFalse(allocateSlot2.isDone());
        Assert.assertFalse(allocateSlot3.isDone());
        Assert.assertFalse(allocateSlot4.isDone());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        AllocationID allocationID2 = (AllocationID) arrayBlockingQueue.take();
        boolean offerSlot = slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN));
        boolean offerSlot2 = slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID2, 0, ResourceProfile.UNKNOWN));
        Assert.assertTrue(offerSlot);
        Assert.assertTrue(offerSlot2);
        LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get();
        LogicalSlot logicalSlot3 = (LogicalSlot) allocateSlot3.get();
        LogicalSlot logicalSlot4 = (LogicalSlot) allocateSlot4.get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals(allocationID, logicalSlot.getAllocationId());
        Assert.assertEquals(allocationID2, logicalSlot3.getAllocationId());
    }

    @Test
    public void testSlotSharingRespectsRemainingResource() throws Exception {
        ResourceProfile resourceProfile = new ResourceProfile(3.0d, 300);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 200);
        ResourceProfile resourceProfile3 = new ResourceProfile(1.0d, 100);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile2), TestingUtils.infiniteTime());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID, 0, resourceProfile)));
        Assert.assertTrue(allocateSlot.isDone());
        Assert.assertEquals(allocationID, ((LogicalSlot) allocateSlot.get()).getAllocationId());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile2), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot2.isDone());
        Assert.assertTrue(slotProvider.allocateSlot(new ScheduledUnit(jobVertexID3, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile3), TestingUtils.infiniteTime()).isDone());
        Assert.assertEquals(allocationID, ((LogicalSlot) allocateSlot.get()).getAllocationId());
        AllocationID allocationID2 = (AllocationID) arrayBlockingQueue.take();
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID2, 0, resourceProfile)));
        Assert.assertTrue(allocateSlot2.isDone());
        Assert.assertEquals(allocationID2, ((LogicalSlot) allocateSlot2.get()).getAllocationId());
    }

    @Test
    public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 100);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 200);
        ResourceProfile resourceProfile3 = new ResourceProfile(5.0d, 500);
        ResourceProfile resourceProfile4 = new ResourceProfile(3.0d, 300);
        ResourceProfile resourceProfile5 = new ResourceProfile(5.0d, 500);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile2), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID3, slotSharingGroupId, (CoLocationConstraint) null), true, SlotProfile.noLocality(resourceProfile3), TestingUtils.infiniteTime());
        Assert.assertFalse(allocateSlot.isDone());
        Assert.assertFalse(allocateSlot2.isDone());
        Assert.assertFalse(allocateSlot3.isDone());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID, 0, resourceProfile4)));
        LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get();
        Assert.assertEquals(allocationID, logicalSlot.getAllocationId());
        Assert.assertEquals(allocationID, logicalSlot2.getAllocationId());
        Assert.assertFalse(allocateSlot3.isDone());
        AllocationID allocationID2 = (AllocationID) arrayBlockingQueue.take();
        Assert.assertTrue(slotPool.offerSlot(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationID2, 1, resourceProfile5)));
        Assert.assertEquals(allocationID2, ((LogicalSlot) allocateSlot3.get()).getAllocationId());
    }
}
