package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.class */
public class SlotPoolCoLocationTest extends TestLogger {

    @ClassRule
    public static final TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();

    @Rule
    public final SlotPoolResource slotPoolResource = new SlotPoolResource(PreviousAllocationSlotSelectionStrategy.create());

    @Test
    public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint locationConstraint = coLocationGroup.getLocationConstraint(0);
        CoLocationConstraint locationConstraint2 = coLocationGroup.getLocationConstraint(1);
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, locationConstraint), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, locationConstraint2), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, locationConstraint), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot4 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, locationConstraint2), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        AllocationID allocationID2 = (AllocationID) arrayBlockingQueue.take();
        Collection offerSlots = slotPool.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singletonList(new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN)));
        Collection offerSlots2 = slotPool.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singletonList(new SlotOffer(allocationID2, 0, ResourceProfile.UNKNOWN)));
        Assert.assertFalse(offerSlots.isEmpty());
        Assert.assertFalse(offerSlots2.isEmpty());
        LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot3.get();
        LogicalSlot logicalSlot3 = (LogicalSlot) allocateSlot4.get();
        LogicalSlot logicalSlot4 = (LogicalSlot) allocateSlot2.get();
        Assert.assertEquals(logicalSlot.getAllocationId(), logicalSlot2.getAllocationId());
        Assert.assertEquals(logicalSlot3.getAllocationId(), logicalSlot4.getAllocationId());
        Assert.assertNotEquals(logicalSlot.getAllocationId(), logicalSlot3.getAllocationId());
    }

    @Test
    public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionException, InterruptedException {
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 100);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 200);
        ResourceProfile resourceProfile3 = new ResourceProfile(5.0d, 500);
        ResourceProfile resourceProfile4 = new ResourceProfile(3.0d, 300);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        CoLocationConstraint locationConstraint = new CoLocationGroup().getLocationConstraint(0);
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile2), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID3, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile3), TestingUtils.infiniteTime());
        Assert.assertFalse(slotPool.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singletonList(new SlotOffer((AllocationID) arrayBlockingQueue.take(), 0, resourceProfile4))).isEmpty());
        for (CompletableFuture completableFuture : Arrays.asList(allocateSlot, allocateSlot2, allocateSlot3)) {
            Assert.assertTrue(completableFuture.isDone() && completableFuture.isCompletedExceptionally());
            completableFuture.whenComplete((logicalSlot, th) -> {
                Assert.assertTrue(th instanceof SharedSlotOversubscribedException);
                Assert.assertTrue(((SharedSlotOversubscribedException) th).canRetry());
            });
        }
    }

    @Test
    public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionException, InterruptedException {
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 100);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 200);
        ResourceProfile resourceProfile3 = new ResourceProfile(5.0d, 500);
        ResourceProfile resourceProfile4 = new ResourceProfile(3.0d, 300);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        this.slotPoolResource.getTestingResourceManagerGateway().setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        CoLocationConstraint locationConstraint = new CoLocationGroup().getLocationConstraint(0);
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture allocateSlot = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile), TestingUtils.infiniteTime());
        CompletableFuture allocateSlot2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID2, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile2), TestingUtils.infiniteTime());
        AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
        Assert.assertFalse(slotPool.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singletonList(new SlotOffer(allocationID, 0, resourceProfile4))).isEmpty());
        CompletableFuture allocateSlot3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexID3, slotSharingGroupId, locationConstraint), true, SlotProfile.noLocality(resourceProfile3), TestingUtils.infiniteTime());
        LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get();
        Assert.assertEquals(allocationID, logicalSlot.getAllocationId());
        Assert.assertEquals(allocationID, logicalSlot2.getAllocationId());
        Assert.assertTrue(allocateSlot3.isDone() && allocateSlot3.isCompletedExceptionally());
        allocateSlot3.whenComplete((logicalSlot3, th) -> {
            Assert.assertTrue(th instanceof SharedSlotOversubscribedException);
            Assert.assertTrue(((SharedSlotOversubscribedException) th).canRetry());
        });
    }
}
