package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.ManualClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.class */
public class SlotPoolTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
    private final Time timeout = Time.seconds(10);
    private RpcService rpcService;
    private JobID jobId;
    private TaskManagerLocation taskManagerLocation;
    private SimpleAckingTaskManagerGateway taskManagerGateway;
    private TestingResourceManagerGateway resourceManagerGateway;

    @Before
    public void setUp() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @After
    public void tearDown() throws Exception {
        RpcUtils.terminateRpcService(this.rpcService, this.timeout);
    }

    @Test
    public void testAllocateSimpleSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest);
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, new SlotOffer(((SlotRequest) completableFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS)).getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertTrue(logicalSlot.isAlive());
            Assert.assertEquals(this.taskManagerLocation, logicalSlot.getTaskManagerLocation());
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            throw th;
        }
    }

    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            do {
            } while (!arrayBlockingQueue.offer(slotRequest));
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            ArrayList arrayList = new ArrayList(2);
            for (int i = 0; i < 2; i++) {
                arrayList.add(arrayBlockingQueue.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            }
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, new SlotOffer(((SlotRequest) arrayList.get(0)).getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            logicalSlot.releaseSlot();
            LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot2.isDone());
            Assert.assertNotEquals(logicalSlot, logicalSlot2);
            Assert.assertFalse(logicalSlot.isAlive());
            Assert.assertTrue(logicalSlot2.isAlive());
            Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
            Assert.assertEquals(logicalSlot.getPhysicalSlotNumber(), logicalSlot2.getPhysicalSlotNumber());
            Assert.assertEquals(logicalSlot.getAllocationId(), logicalSlot2.getAllocationId());
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            throw th;
        }
    }

    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest);
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, new SlotOffer(((SlotRequest) completableFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS)).getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            logicalSlot.releaseSlot();
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            LogicalSlot logicalSlot2 = (LogicalSlot) allocateSlot2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot2.isDone());
            Assert.assertNotEquals(logicalSlot, logicalSlot2);
            Assert.assertFalse(logicalSlot.isAlive());
            Assert.assertTrue(logicalSlot2.isAlive());
            Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
            Assert.assertEquals(logicalSlot.getPhysicalSlotNumber(), logicalSlot2.getPhysicalSlotNumber());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testOfferSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest);
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse(allocateSlot.isDone());
            SlotRequest slotRequest2 = (SlotRequest) completableFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(new LocalTaskManagerLocation(), this.taskManagerGateway, slotOffer).get()).booleanValue());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, new SlotOffer(new AllocationID(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue(logicalSlot.isAlive());
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            Assert.assertTrue(logicalSlot.isAlive());
            SlotOffer slotOffer2 = new SlotOffer(slotRequest2.getAllocationId(), 1, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer2).get()).booleanValue());
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(localTaskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            logicalSlot.releaseSlot();
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer2).get()).booleanValue());
            Assert.assertFalse(((Boolean) slotPoolGateway.offerSlot(localTaskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testReleaseResource() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest);
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            SlotRequest slotRequest2 = (SlotRequest) completableFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE)).get()).booleanValue());
            LogicalSlot logicalSlot = (LogicalSlot) allocateSlot.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue(allocateSlot.isDone());
            Assert.assertFalse(allocateSlot2.isDone());
            CompletableFuture completableFuture2 = new CompletableFuture();
            logicalSlot.tryAssignPayload(new DummyPayload(completableFuture2));
            slotPoolGateway.releaseTaskManager(this.taskManagerLocation.getResourceID());
            completableFuture2.get();
            Assert.assertFalse(logicalSlot.isAlive());
            Thread.sleep(10L);
            Assert.assertFalse(allocateSlot2.isDone());
            slotPool.shutDown();
        } catch (Throwable th) {
            slotPool.shutDown();
            throw th;
        }
    }

    @Test
    public void testSlotRequestCancellationUponFailingRequest() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotFuture(completableFuture);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture3.complete(slotRequest.getAllocationId());
        });
        this.resourceManagerGateway.setCancelSlotConsumer(allocationID -> {
            completableFuture2.complete(allocationID);
        });
        ScheduledUnit scheduledUnit = new ScheduledUnit(new JobVertexID(), (SlotSharingGroupId) null, (CoLocationConstraint) null);
        try {
            CompletableFuture allocateSlot = setupSlotPool(slotPool, this.resourceManagerGateway).allocateSlot(new SlotRequestId(), scheduledUnit, new SlotProfile(ResourceProfile.UNKNOWN, Collections.emptyList(), Collections.emptyList()), true, this.timeout);
            completableFuture.completeExceptionally(new FlinkException("Testing exception."));
            try {
                allocateSlot.get();
                Assert.fail("The slot future should not have been completed properly.");
            } catch (Exception e) {
            }
            Assert.assertEquals(completableFuture3.get(), completableFuture2.get());
        } finally {
            try {
                RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            } catch (Exception e2) {
                LOG.warn("Could not properly terminate the SlotPool.", e2);
            }
        }
    }

    @Test
    public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            arrayBlockingQueue.offer(slotRequest.getAllocationId());
        });
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(2);
        TestingResourceManagerGateway testingResourceManagerGateway = this.resourceManagerGateway;
        arrayBlockingQueue2.getClass();
        testingResourceManagerGateway.setCancelSlotConsumer((v1) -> {
            r1.offer(v1);
        });
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            ScheduledUnit scheduledUnit = new ScheduledUnit(new JobVertexID(), (SlotSharingGroupId) null, (CoLocationConstraint) null);
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(slotRequestId, scheduledUnit, SlotProfile.noRequirements(), true, this.timeout);
            AllocationID allocationID = (AllocationID) arrayBlockingQueue.take();
            CompletableFuture allocateSlot2 = slotPoolGateway.allocateSlot(slotRequestId2, scheduledUnit, SlotProfile.noRequirements(), true, this.timeout);
            AllocationID allocationID2 = (AllocationID) arrayBlockingQueue.take();
            slotPoolGateway.releaseSlot(slotRequestId, (SlotSharingGroupId) null, (Throwable) null);
            try {
                allocateSlot.get();
                Assert.fail("The first slot future should have failed because it was cancelled.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FlinkException);
            }
            Assert.assertEquals(allocationID, arrayBlockingQueue2.take());
            SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get();
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get()).booleanValue());
            Assert.assertEquals(allocationID, ((LogicalSlot) allocateSlot2.get()).getAllocationId());
            Assert.assertEquals(allocationID2, arrayBlockingQueue2.take());
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            throw th;
        }
    }

    @Test
    public void testShutdownReleasesAllSlots() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            ArrayList arrayList = new ArrayList(2);
            for (int i = 0; i < 2; i++) {
                arrayList.add(new SlotOffer(new AllocationID(), i, ResourceProfile.UNKNOWN));
            }
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, th) -> {
                try {
                    arrayBlockingQueue.put(allocationID);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                } catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally(e);
                }
            });
            MatcherAssert.assertThat((Collection) slotPoolGateway.offerSlots(this.taskManagerLocation, this.taskManagerGateway, arrayList).get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(arrayList));
            slotPool.shutDown();
            slotPool.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            ArrayList arrayList2 = new ArrayList(2);
            while (arrayList2.size() < 2) {
                arrayBlockingQueue.drainTo(arrayList2);
            }
            MatcherAssert.assertThat(arrayList2, Matchers.containsInAnyOrder(arrayList.stream().map((v0) -> {
                return v0.getAllocationId();
            }).toArray()));
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            throw th2;
        }
    }

    @Test
    public void testCheckIdleSlot() throws Exception {
        ManualClock manualClock = new ManualClock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, manualClock, TestingUtils.infiniteTime(), this.timeout);
        try {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, th) -> {
                try {
                    arrayBlockingQueue.put(allocationID);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                } catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally(e);
                }
            });
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            AllocationID allocationID2 = new AllocationID();
            AllocationID allocationID3 = new AllocationID();
            SlotOffer slotOffer = new SlotOffer(allocationID2, 0, ResourceProfile.UNKNOWN);
            SlotOffer slotOffer2 = new SlotOffer(allocationID3, 1, ResourceProfile.UNKNOWN);
            MatcherAssert.assertThat(slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get(), Matchers.is(Acknowledge.get()));
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get(), Matchers.is(true));
            manualClock.advanceTime(this.timeout.toMilliseconds() - 1, TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer2).get(), Matchers.is(true));
            manualClock.advanceTime(1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            MatcherAssert.assertThat((AllocationID) arrayBlockingQueue.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.is(allocationID2));
            MatcherAssert.assertThat(Boolean.valueOf(arrayBlockingQueue.isEmpty()), Matchers.is(true));
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
            throw th2;
        }
    }

    @Test
    public void testReleasingIdleSlotFailed() throws Exception {
        ManualClock manualClock = new ManualClock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, manualClock, TestingUtils.infiniteTime(), this.timeout);
        try {
            SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, this.resourceManagerGateway);
            AllocationID allocationID = new AllocationID();
            SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
            ArrayDeque arrayDeque = new ArrayDeque(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationID2, th) -> {
                return arrayDeque.isEmpty() ? CompletableFuture.completedFuture(Acknowledge.get()) : (CompletableFuture) arrayDeque.pop();
            });
            arrayDeque.add(FutureUtils.completedExceptionally(new FlinkException("Test failure")));
            CompletableFuture completableFuture = new CompletableFuture();
            arrayDeque.add(completableFuture);
            MatcherAssert.assertThat(slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get(), Matchers.is(Acknowledge.get()));
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer).get(), Matchers.is(true));
            manualClock.advanceTime(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            LogicalSlot logicalSlot = (LogicalSlot) slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noRequirements(), true, this.timeout).get();
            MatcherAssert.assertThat(logicalSlot.getAllocationId(), Matchers.is(allocationID));
            slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get();
            manualClock.advanceTime(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            CompletableFuture allocateSlot = slotPoolGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noRequirements(), true, this.timeout);
            slotPoolGateway.releaseTaskManager(this.taskManagerLocation.getResourceID()).get();
            completableFuture.completeExceptionally(new FlinkException("Second test exception"));
            try {
                allocateSlot.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail("Expected to fail with a timeout.");
            } catch (TimeoutException e) {
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(slotPool, this.timeout);
        }
    }

    private static SlotPoolGateway setupSlotPool(SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception {
        slotPool.start(JobMasterId.generate(), "foobar");
        slotPool.connectToResourceManager(resourceManagerGateway);
        return slotPool.getSelfGateway(SlotPoolGateway.class);
    }

    private AllocatedSlot createSlot(AllocationID allocationID) {
        return new AllocatedSlot(allocationID, this.taskManagerLocation, 0, ResourceProfile.UNKNOWN, this.taskManagerGateway);
    }
}
