package org.apache.flink.runtime.instance;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/instance/SlotPoolTest.class */
public class SlotPoolTest extends TestLogger {
    private final Time timeout = Time.seconds(10);
    private RpcService rpcService;
    private JobID jobId;

    @Before
    public void setUp() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
    }

    @After
    public void tearDown() throws Exception {
        this.rpcService.stopService();
    }

    @Test
    public void testAllocateSimpleSlot() throws Exception {
        ResourceManagerGateway createResourceManagerGatewayMock = createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, createResourceManagerGatewayMock);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway) Mockito.verify(createResourceManagerGatewayMock, Mockito.timeout(this.timeout.toMilliseconds()))).requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) forClass.capture(), (Time) Matchers.any(Time.class));
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(resourceID, ((SlotRequest) forClass.getValue()).getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            SimpleSlot simpleSlot = (SimpleSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertTrue(simpleSlot.isAlive());
            Assert.assertEquals(resourceID, simpleSlot.getTaskManagerID());
            Assert.assertEquals(this.jobId, simpleSlot.getJobID());
            Assert.assertEquals(slotPool.getSlotOwner(), simpleSlot.getOwner());
            Assert.assertEquals(slotPool.getAllocatedSlots().get(simpleSlot.getAllocatedSlot().getSlotAllocationId()), simpleSlot);
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ResourceManagerGateway createResourceManagerGatewayMock = createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, createResourceManagerGatewayMock);
            ResourceID resourceID = new ResourceID("resource");
            slotPool.registerTaskManager(resourceID);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway) Mockito.verify(createResourceManagerGatewayMock, Mockito.timeout(this.timeout.toMilliseconds()).times(2))).requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) forClass.capture(), (Time) Matchers.any(Time.class));
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(resourceID, ((SlotRequest) forClass.getAllValues().get(0)).getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            SimpleSlot simpleSlot = (SimpleSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            simpleSlot.releaseSlot();
            SimpleSlot simpleSlot2 = (SimpleSlot) allocateSlot2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot2.isDone());
            Assert.assertNotEquals(simpleSlot, simpleSlot2);
            Assert.assertTrue(simpleSlot.isReleased());
            Assert.assertTrue(simpleSlot2.isAlive());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot2.getTaskManagerID());
            Assert.assertEquals(simpleSlot.getSlotNumber(), simpleSlot2.getSlotNumber());
            Assert.assertEquals(slotPool.getAllocatedSlots().get(simpleSlot.getAllocatedSlot().getSlotAllocationId()), simpleSlot2);
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        ResourceManagerGateway createResourceManagerGatewayMock = createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, createResourceManagerGatewayMock);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway) Mockito.verify(createResourceManagerGatewayMock, Mockito.timeout(this.timeout.toMilliseconds()))).requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) forClass.capture(), (Time) Matchers.any(Time.class));
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(resourceID, ((SlotRequest) forClass.getValue()).getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            SimpleSlot simpleSlot = (SimpleSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            simpleSlot.releaseSlot();
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            SimpleSlot simpleSlot2 = (SimpleSlot) allocateSlot2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot2.isDone());
            Assert.assertNotEquals(simpleSlot, simpleSlot2);
            Assert.assertTrue(simpleSlot.isReleased());
            Assert.assertTrue(simpleSlot2.isAlive());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot2.getTaskManagerID());
            Assert.assertEquals(simpleSlot.getSlotNumber(), simpleSlot2.getSlotNumber());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testOfferSlot() throws Exception {
        ResourceManagerGateway createResourceManagerGatewayMock = createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, createResourceManagerGatewayMock);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway) Mockito.verify(createResourceManagerGatewayMock, Mockito.timeout(this.timeout.toMilliseconds()))).requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) forClass.capture(), (Time) Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest) forClass.getValue();
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(resourceID, new AllocationID(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            AllocatedSlot createAllocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot).get()).booleanValue());
            SimpleSlot simpleSlot = (SimpleSlot) allocateSlot.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue(simpleSlot.isAlive());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot).get()).booleanValue());
            Assert.assertTrue(simpleSlot.isAlive());
            simpleSlot.releaseSlot();
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot).get()).booleanValue());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testReleaseResource() throws Exception {
        ResourceManagerGateway createResourceManagerGatewayMock = createResourceManagerGatewayMock();
        final CompletableFuture completableFuture = new CompletableFuture();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId) { // from class: org.apache.flink.runtime.instance.SlotPoolTest.1
            public void returnAllocatedSlot(Slot slot) {
                super.returnAllocatedSlot(slot);
                completableFuture.complete(true);
            }
        };
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, createResourceManagerGatewayMock);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway) Mockito.verify(createResourceManagerGatewayMock, Mockito.timeout(this.timeout.toMilliseconds()))).requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) forClass.capture(), (Time) Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest) forClass.getValue();
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot((ScheduledUnit) Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, (Iterable) null, this.timeout);
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            SimpleSlot simpleSlot = (SimpleSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            slotPoolGateway.releaseTaskManager(resourceID);
            completableFuture.get();
            Assert.assertTrue(simpleSlot.isReleased());
            Thread.sleep(10L);
            Assert.assertFalse(allocateSlot2.isDone());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    private static ResourceManagerGateway createResourceManagerGatewayMock() {
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(resourceManagerGateway.requestSlot((JobMasterId) Matchers.any(JobMasterId.class), (SlotRequest) Matchers.any(SlotRequest.class), (Time) Matchers.any(Time.class))).thenReturn(Mockito.mock(CompletableFuture.class, Mockito.RETURNS_MOCKS));
        return resourceManagerGateway;
    }

    private static SlotPoolGateway setupSlotPool(SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception {
        slotPool.start(JobMasterId.generate(), "foobar");
        slotPool.connectToResourceManager(resourceManagerGateway);
        return slotPool.getSelfGateway(SlotPoolGateway.class);
    }

    static AllocatedSlot createAllocatedSlot(ResourceID resourceID, AllocationID allocationID, JobID jobID, ResourceProfile resourceProfile) {
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
        Mockito.when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
        return new AllocatedSlot(allocationID, jobID, taskManagerLocation, 0, resourceProfile, (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class));
    }
}
