package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.class */
public class SlotManagerTest extends TestLogger {
    @Test
    public void testTaskManagerRegistration() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotID slotID = new SlotID(generate2, 0);
        SlotID slotID2 = new SlotID(generate2, 1);
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(slotID, resourceProfile), new SlotStatus(slotID2, resourceProfile)));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertTrue("The number registered slots does not equal the expected number.", 2 == createSlotManager.getNumberRegisteredSlots());
                Assert.assertNotNull(createSlotManager.getSlot(slotID));
                Assert.assertNotNull(createSlotManager.getSlot(slotID2));
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTaskManagerUnregistration() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        JobID jobID = new JobID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(new CompletableFuture());
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotID slotID = new SlotID(generate2, 0);
        SlotID slotID2 = new SlotID(generate2, 1);
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(slotID, resourceProfile, jobID, allocationID), new SlotStatus(slotID2, resourceProfile)));
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationID2, resourceProfile, "foobar");
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertTrue("The number registered slots does not equal the expected number.", 2 == createSlotManager.getNumberRegisteredSlots());
                TaskManagerSlot slot = createSlotManager.getSlot(slotID);
                TaskManagerSlot slot2 = createSlotManager.getSlot(slotID2);
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
                Assert.assertTrue(slot2.getState() == TaskManagerSlot.State.FREE);
                Assert.assertTrue(createSlotManager.registerSlotRequest(slotRequest));
                Assert.assertFalse(slot2.getState() == TaskManagerSlot.State.FREE);
                Assert.assertTrue(slot2.getState() == TaskManagerSlot.State.PENDING);
                PendingSlotRequest slotRequest2 = createSlotManager.getSlotRequest(allocationID2);
                Assert.assertTrue("The pending slot request should have been assigned to slot 2", slotRequest2.isAssigned());
                createSlotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
                Assert.assertTrue(0 == createSlotManager.getNumberRegisteredSlots());
                Assert.assertFalse(slotRequest2.isAssigned());
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSlotRequestWithoutFreeSlots() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerSlotRequest(slotRequest);
                ((ResourceActions) Mockito.verify(resourceActions)).allocateResource((ResourceProfile) Matchers.eq(resourceProfile));
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSlotRequestWithResourceAllocationFailure() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), new ResourceProfile(42.0d, 1337), "localhost");
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        ((ResourceActions) Mockito.doThrow(new ResourceManagerException("Test exception")).when(resourceActions)).allocateResource((ResourceProfile) Matchers.any(ResourceProfile.class));
        try {
            SlotManager createSlotManager = createSlotManager(generate, resourceActions);
            Throwable th = null;
            try {
                createSlotManager.registerSlotRequest(slotRequest);
                Assert.fail("The slot request should have failed with a ResourceManagerException.");
                if (createSlotManager != null) {
                    if (0 != 0) {
                        try {
                            createSlotManager.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createSlotManager.close();
                    }
                }
            } finally {
            }
        } catch (ResourceManagerException e) {
        }
    }

    @Test
    public void testSlotRequestWithFreeSlot() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID generate2 = ResourceID.generate();
        JobID jobID = new JobID();
        SlotID slotID = new SlotID(generate2, 0);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "localhost");
        SlotManager createSlotManager = createSlotManager(generate, (ResourceActions) Mockito.mock(ResourceActions.class));
        Throwable th = null;
        try {
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
            Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.eq(slotID), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
            createSlotManager.registerTaskManager(new TaskExecutorConnection(generate2, taskExecutorGateway), new SlotReport(new SlotStatus(slotID, resourceProfile)));
            Assert.assertTrue("The slot request should be accepted", createSlotManager.registerSlotRequest(slotRequest));
            ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway)).requestSlot((SlotID) Matchers.eq(slotID), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), (String) Matchers.eq("localhost"), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
            Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, createSlotManager.getSlot(slotID).getAllocationId());
            if (createSlotManager != null) {
                if (0 == 0) {
                    createSlotManager.close();
                    return;
                }
                try {
                    createSlotManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createSlotManager != null) {
                if (0 != 0) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testUnregisterPendingSlotRequest() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        ResourceID generate2 = ResourceID.generate();
        SlotID slotID = new SlotID(generate2, 0);
        AllocationID allocationID = new AllocationID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(new CompletableFuture());
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, resourceProfile));
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar");
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                TaskManagerSlot slot = createSlotManager.getSlot(slotID);
                createSlotManager.registerSlotRequest(slotRequest);
                Assert.assertNotNull(createSlotManager.getSlotRequest(allocationID));
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.PENDING);
                createSlotManager.unregisterSlotRequest(allocationID);
                Assert.assertNull(createSlotManager.getSlotRequest(allocationID));
                Assert.assertTrue(createSlotManager.getSlot(slotID).getState() == TaskManagerSlot.State.FREE);
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFulfillingPendingSlotRequest() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID generate2 = ResourceID.generate();
        JobID jobID = new JobID();
        SlotID slotID = new SlotID(generate2, 0);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "localhost");
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.eq(slotID), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, resourceProfile));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue("The slot request should be accepted", createSlotManager.registerSlotRequest(slotRequest));
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.times(1))).allocateResource((ResourceProfile) Matchers.eq(resourceProfile));
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway)).requestSlot((SlotID) Matchers.eq(slotID), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), (String) Matchers.eq("localhost"), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, createSlotManager.getSlot(slotID).getAllocationId());
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFreeSlot() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID generate2 = ResourceID.generate();
        JobID jobID = new JobID();
        SlotID slotID = new SlotID(generate2, 0);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class));
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, resourceProfile, jobID, allocationID));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                TaskManagerSlot slot = createSlotManager.getSlot(slotID);
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, slot.getAllocationId());
                createSlotManager.freeSlot(slotID, new AllocationID());
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, slot.getAllocationId());
                createSlotManager.freeSlot(slotID, allocationID);
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
                Assert.assertNull(slot.getAllocationId());
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testDuplicatePendingSlotRequest() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 1);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationID, resourceProfile2, "barfoo");
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(createSlotManager.registerSlotRequest(slotRequest));
                Assert.assertFalse(createSlotManager.registerSlotRequest(slotRequest2));
                if (createSlotManager != null) {
                    if (0 != 0) {
                        try {
                            createSlotManager.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createSlotManager.close();
                    }
                }
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.times(1))).allocateResource((ResourceProfile) Matchers.any(ResourceProfile.class));
            } finally {
            }
        } catch (Throwable th3) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        ResourceID generate2 = ResourceID.generate();
        SlotID slotID = new SlotID(generate2, 0);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class));
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, resourceProfile, jobID, allocationID));
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "foobar");
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertFalse(createSlotManager.registerSlotRequest(slotRequest));
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 1);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationID, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotID slotID = new SlotID(generate2, 0);
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, resourceProfile));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertTrue(createSlotManager.registerSlotRequest(slotRequest));
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, createSlotManager.getSlot(slotID).getAllocationId());
                Assert.assertFalse(createSlotManager.registerSlotRequest(slotRequest2));
                if (createSlotManager != null) {
                    if (0 != 0) {
                        try {
                            createSlotManager.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createSlotManager.close();
                    }
                }
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.never())).allocateResource((ResourceProfile) Matchers.any(ResourceProfile.class));
            } finally {
            }
        } catch (Throwable th3) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0d, 1);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationID, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotID slotID = new SlotID(generate2, 0);
        SlotReport slotReport = new SlotReport(new SlotStatus(slotID, new ResourceProfile(2.0d, 2)));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertTrue(createSlotManager.registerSlotRequest(slotRequest));
                TaskManagerSlot slot = createSlotManager.getSlot(slotID);
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, slot.getAllocationId());
                createSlotManager.freeSlot(slotID, allocationID);
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
                Assert.assertNull(slot.getAllocationId());
                Assert.assertTrue(createSlotManager.registerSlotRequest(slotRequest2));
                Assert.assertEquals("The slot has not been allocated to the expected allocation id.", allocationID, slot.getAllocationId());
                if (createSlotManager != null) {
                    if (0 != 0) {
                        try {
                            createSlotManager.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createSlotManager.close();
                    }
                }
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.never())).allocateResource((ResourceProfile) Matchers.any(ResourceProfile.class));
            } finally {
            }
        } catch (Throwable th3) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReceivingUnknownSlotReport() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        InstanceID instanceID = new InstanceID();
        SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(ResourceID.generate(), 0), new ResourceProfile(1.0d, 1)));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(0 == createSlotManager.getNumberRegisteredSlots());
                Assert.assertFalse(createSlotManager.reportSlotStatus(instanceID, slotReport));
                Assert.assertTrue(0 == createSlotManager.getNumberRegisteredSlots());
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testUpdateSlotReport() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        ResourceID generate2 = ResourceID.generate();
        SlotID slotID = new SlotID(generate2, 0);
        SlotID slotID2 = new SlotID(generate2, 1);
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        SlotStatus slotStatus = new SlotStatus(slotID, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotID2, resourceProfile);
        SlotStatus slotStatus3 = new SlotStatus(slotID2, resourceProfile, jobID, allocationID);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(slotStatus3, slotStatus));
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(0 == createSlotManager.getNumberRegisteredSlots());
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                TaskManagerSlot slot = createSlotManager.getSlot(slotID);
                TaskManagerSlot slot2 = createSlotManager.getSlot(slotID2);
                Assert.assertTrue(2 == createSlotManager.getNumberRegisteredSlots());
                Assert.assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
                Assert.assertTrue(slot2.getState() == TaskManagerSlot.State.FREE);
                Assert.assertTrue(createSlotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2));
                Assert.assertTrue(2 == createSlotManager.getNumberRegisteredSlots());
                Assert.assertNotNull(createSlotManager.getSlot(slotID));
                Assert.assertNotNull(createSlotManager.getSlot(slotID2));
                Assert.assertEquals(allocationID, createSlotManager.getSlot(slotID2).getAllocationId());
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID generate2 = ResourceID.generate();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class));
        final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(generate2, 0), new ResourceProfile(1.0d, 1)));
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.milliseconds(500L));
        Throwable th = null;
        try {
            try {
                slotManager.start(generate, defaultExecutor, resourceActions);
                defaultExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        slotManager.registerTaskManager(taskExecutorConnection, slotReport);
                    }
                });
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.timeout(50000L).times(1))).releaseResource((InstanceID) Matchers.eq(taskExecutorConnection.getInstanceID()), (Exception) Matchers.any(Exception.class));
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSlotRequestTimeout() throws Exception {
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        ResourceManagerId generate = ResourceManagerId.generate();
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        final SlotRequest slotRequest = new SlotRequest(jobID, allocationID, new ResourceProfile(1.0d, 1), "foobar");
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), Time.milliseconds(50L), TestingUtils.infiniteTime());
        Throwable th = null;
        try {
            try {
                slotManager.start(generate, defaultExecutor, resourceActions);
                final AtomicReference atomicReference = new AtomicReference(null);
                defaultExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            Assert.assertTrue(slotManager.registerSlotRequest(slotRequest));
                        } catch (Exception e) {
                            atomicReference.compareAndSet(null, e);
                        }
                    }
                });
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.timeout(5000L).times(1))).notifyAllocationFailure((JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), (Exception) Matchers.any(TimeoutException.class));
                if (atomicReference.get() != null) {
                    throw ((Exception) atomicReference.get());
                }
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "foobar");
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.any(ResourceManagerId.class), (Time) Matchers.any(Time.class))).thenReturn(completableFuture, new CompletableFuture[]{completableFuture2});
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(new SlotID(generate2, 0), resourceProfile), new SlotStatus(new SlotID(generate2, 1), resourceProfile)));
        SlotManager createSlotManager = createSlotManager(generate, resourceActions);
        Throwable th = null;
        try {
            try {
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                createSlotManager.registerSlotRequest(slotRequest);
                ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotID.class);
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway, Mockito.times(1))).requestSlot((SlotID) forClass.capture(), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                TaskManagerSlot slot = createSlotManager.getSlot((SlotID) forClass.getValue());
                completableFuture.completeExceptionally(new SlotAllocationException("Test exception."));
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway, Mockito.times(2))).requestSlot((SlotID) forClass.capture(), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                completableFuture2.complete(Acknowledge.get());
                TaskManagerSlot slot2 = createSlotManager.getSlot((SlotID) forClass.getValue());
                Assert.assertTrue(slot2.getState() == TaskManagerSlot.State.ALLOCATED);
                Assert.assertEquals(allocationID, slot2.getAllocationId());
                if (!slot.getSlotId().equals(slot2.getSlotId())) {
                    Assert.assertTrue(slot.getState() == TaskManagerSlot.State.FREE);
                }
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSlotReportWhileActiveSlotRequest() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0d, 1337);
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "foobar");
        CompletableFuture completableFuture = new CompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.any(JobID.class), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.any(ResourceManagerId.class), (Time) Matchers.any(Time.class))).thenReturn(completableFuture, new CompletableFuture[]{CompletableFuture.completedFuture(Acknowledge.get())});
        ResourceID generate2 = ResourceID.generate();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotID slotID = new SlotID(generate2, 0);
        SlotID slotID2 = new SlotID(generate2, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(slotID, resourceProfile), new SlotStatus(slotID2, resourceProfile)));
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        Throwable th = null;
        try {
            try {
                slotManager.start(generate, defaultExecutor, resourceActions);
                CompletableFuture.supplyAsync(() -> {
                    slotManager.registerTaskManager(taskExecutorConnection, slotReport);
                    return null;
                }, defaultExecutor).thenAccept(obj -> {
                    try {
                        slotManager.registerSlotRequest(slotRequest);
                    } catch (SlotManagerException e) {
                        throw new RuntimeException("Could not register slots.", e);
                    }
                }).get();
                ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotID.class);
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway, Mockito.times(1))).requestSlot((SlotID) forClass.capture(), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                SlotID slotID3 = ((SlotID) forClass.getValue()).equals(slotID) ? slotID2 : slotID;
                Assert.assertTrue(((Boolean) CompletableFuture.supplyAsync(() -> {
                    return Boolean.valueOf(slotManager.getSlot(slotID3).getState() == TaskManagerSlot.State.FREE);
                }, defaultExecutor).get()).booleanValue());
                SlotReport slotReport2 = new SlotReport(Arrays.asList(new SlotStatus((SlotID) forClass.getValue(), resourceProfile, new JobID(), new AllocationID()), new SlotStatus(slotID3, resourceProfile)));
                Assert.assertTrue(((Boolean) CompletableFuture.supplyAsync(() -> {
                    return Boolean.valueOf(slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2));
                }, defaultExecutor).get()).booleanValue());
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway, Mockito.timeout(10000L).times(2))).requestSlot((SlotID) forClass.capture(), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                SlotID slotID4 = (SlotID) forClass.getValue();
                Assert.assertEquals(slotID2, slotID4);
                TaskManagerSlot taskManagerSlot = (TaskManagerSlot) CompletableFuture.supplyAsync(() -> {
                    return slotManager.getSlot(slotID4);
                }, defaultExecutor).get();
                Assert.assertTrue(taskManagerSlot.getState() == TaskManagerSlot.State.ALLOCATED);
                Assert.assertEquals(allocationID, taskManagerSlot.getAllocationId());
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTimeoutForUnusedTaskManager() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        ScheduledExecutor defaultScheduledExecutor = TestingUtils.defaultScheduledExecutor();
        ResourceID generate2 = ResourceID.generate();
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, "foobar");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        Mockito.when(taskExecutorGateway.requestSlot((SlotID) Matchers.any(SlotID.class), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, taskExecutorGateway);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(new SlotID(generate2, 0), resourceProfile), new SlotStatus(new SlotID(generate2, 1), resourceProfile)));
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        SlotManager slotManager = new SlotManager(defaultScheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.of(50L, TimeUnit.MILLISECONDS));
        Throwable th = null;
        try {
            try {
                slotManager.start(generate, defaultExecutor, resourceActions);
                CompletableFuture.supplyAsync(() -> {
                    try {
                        return Boolean.valueOf(slotManager.registerSlotRequest(slotRequest));
                    } catch (SlotManagerException e) {
                        throw new CompletionException((Throwable) e);
                    }
                }, defaultExecutor).thenAccept(obj -> {
                    slotManager.registerTaskManager(taskExecutorConnection, slotReport);
                });
                ArgumentCaptor forClass = ArgumentCaptor.forClass(SlotID.class);
                ((TaskExecutorGateway) Mockito.verify(taskExecutorGateway, Mockito.timeout(500L))).requestSlot((SlotID) forClass.capture(), (JobID) Matchers.eq(jobID), (AllocationID) Matchers.eq(allocationID), Matchers.anyString(), (ResourceManagerId) Matchers.eq(generate), (Time) Matchers.any(Time.class));
                Assert.assertFalse(((Boolean) CompletableFuture.supplyAsync(() -> {
                    return Boolean.valueOf(slotManager.isTaskManagerIdle(taskExecutorConnection.getInstanceID()));
                }, defaultExecutor).get()).booleanValue());
                SlotID slotID = (SlotID) forClass.getValue();
                TaskManagerSlot taskManagerSlot = (TaskManagerSlot) CompletableFuture.supplyAsync(() -> {
                    return slotManager.getSlot(slotID);
                }, defaultExecutor).get();
                Assert.assertTrue(taskManagerSlot.getState() == TaskManagerSlot.State.ALLOCATED);
                Assert.assertEquals(allocationID, taskManagerSlot.getAllocationId());
                Assert.assertTrue(((Boolean) CompletableFuture.runAsync(() -> {
                    slotManager.freeSlot(slotID, allocationID);
                }, defaultExecutor).thenApply(obj2 -> {
                    return Boolean.valueOf(slotManager.isTaskManagerIdle(taskExecutorConnection.getInstanceID()));
                }).get()).booleanValue());
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.timeout(500L).times(1))).releaseResource((InstanceID) Matchers.eq(taskExecutorConnection.getInstanceID()), (Exception) Matchers.any(Exception.class));
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time milliseconds = Time.milliseconds(10L);
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID generate2 = ResourceID.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate2, (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class));
        SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(generate2, 0), new ResourceProfile(1.0d, 1)));
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), milliseconds);
        Throwable th = null;
        try {
            try {
                slotManager.start(generate, Executors.directExecutor(), resourceActions);
                slotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Assert.assertEquals(1L, slotManager.getNumberRegisteredSlots());
                ((ResourceActions) Mockito.verify(resourceActions, Mockito.timeout(milliseconds.toMilliseconds() * 20).atLeast(1))).releaseResource((InstanceID) Matchers.eq(taskExecutorConnection.getInstanceID()), (Exception) Matchers.any(Exception.class));
                Assert.assertEquals(1L, slotManager.getNumberRegisteredSlots());
                slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
                Assert.assertEquals(0L, slotManager.getNumberRegisteredSlots());
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testReportAllocatedSlot() throws Exception {
        ResourceID generate = ResourceID.generate();
        ResourceActions resourceActions = (ResourceActions) Mockito.mock(ResourceActions.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        Throwable th = null;
        try {
            try {
                slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), resourceActions);
                SlotID slotID = new SlotID(generate, 0);
                slotManager.registerTaskManager(taskExecutorConnection, new SlotReport(new SlotStatus(slotID, ResourceProfile.UNKNOWN)));
                Assert.assertThat(Integer.valueOf(slotManager.getNumberRegisteredSlots()), org.hamcrest.Matchers.is(org.hamcrest.Matchers.equalTo(1)));
                slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), new SlotReport(new SlotStatus(slotID, ResourceProfile.UNKNOWN, new JobID(), new AllocationID())));
                AllocationID allocationID = new AllocationID();
                slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, ResourceProfile.UNKNOWN, "foobar"));
                Assert.assertThat(Boolean.valueOf(slotManager.getSlotRequest(allocationID).isAssigned()), org.hamcrest.Matchers.is(false));
                if (slotManager != null) {
                    if (0 == 0) {
                        slotManager.close();
                        return;
                    }
                    try {
                        slotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (slotManager != null) {
                if (th != null) {
                    try {
                        slotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    slotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSlotRequestFailure() throws Exception {
        SlotManager createSlotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().createTestingResourceActions());
        Throwable th = null;
        try {
            try {
                createSlotManager.registerSlotRequest(new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"));
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
                ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(1);
                TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple5 -> {
                    arrayBlockingQueue.offer(tuple5);
                    try {
                        return (CompletableFuture) arrayBlockingQueue2.take();
                    } catch (InterruptedException e) {
                        return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
                    }
                }).createTestingTaskExecutorGateway();
                ResourceID generate = ResourceID.generate();
                TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, createTestingTaskExecutorGateway);
                SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(generate, 0), ResourceProfile.UNKNOWN));
                CompletableFuture completableFuture = new CompletableFuture();
                arrayBlockingQueue2.offer(completableFuture);
                createSlotManager.registerTaskManager(taskExecutorConnection, slotReport);
                Tuple5 tuple52 = (Tuple5) arrayBlockingQueue.take();
                CompletableFuture completableFuture2 = new CompletableFuture();
                arrayBlockingQueue2.offer(completableFuture2);
                completableFuture.completeExceptionally(new SlotAllocationException("Test exception"));
                Tuple5 tuple53 = (Tuple5) arrayBlockingQueue.take();
                Assert.assertThat(tuple53.f2, org.hamcrest.Matchers.equalTo(tuple52.f2));
                Assert.assertThat(tuple53.f0, org.hamcrest.Matchers.equalTo(tuple52.f0));
                completableFuture2.complete(Acknowledge.get());
                TaskManagerSlot slot = createSlotManager.getSlot((SlotID) tuple53.f0);
                Assert.assertThat(slot.getState(), org.hamcrest.Matchers.equalTo(TaskManagerSlot.State.ALLOCATED));
                Assert.assertThat(slot.getAllocationId(), org.hamcrest.Matchers.equalTo(tuple53.f2));
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
        ArrayDeque arrayDeque = new ArrayDeque(5);
        SlotManager createSlotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().setNotifyAllocationFailureConsumer(tuple3 -> {
            arrayDeque.offer(Tuple2.of(tuple3.f0, tuple3.f1));
        }).createTestingResourceActions());
        Throwable th = null;
        try {
            try {
                JobID jobID = new JobID();
                SlotRequest createSlotRequest = createSlotRequest(jobID);
                SlotRequest createSlotRequest2 = createSlotRequest(jobID);
                createSlotManager.registerSlotRequest(createSlotRequest);
                createSlotManager.registerSlotRequest(createSlotRequest2);
                ResourceID generate = ResourceID.generate();
                TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
                createSlotManager.registerTaskManager(taskExecutorConnection, createSlotReport(generate, 2));
                JobID jobID2 = new JobID();
                SlotRequest createSlotRequest3 = createSlotRequest(jobID2);
                SlotRequest createSlotRequest4 = createSlotRequest(jobID2);
                createSlotManager.registerSlotRequest(createSlotRequest3);
                createSlotManager.registerSlotRequest(createSlotRequest4);
                JobID jobID3 = new JobID();
                SlotRequest createSlotRequest5 = createSlotRequest(jobID3);
                createSlotManager.registerSlotRequest(createSlotRequest5);
                ResourceID generate2 = ResourceID.generate();
                TaskExecutorConnection taskExecutorConnection2 = new TaskExecutorConnection(generate2, new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
                createSlotManager.registerTaskManager(taskExecutorConnection2, createSlotReport(generate2, 3));
                createSlotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
                Assert.assertThat(arrayDeque, org.hamcrest.Matchers.hasSize(2));
                HashSet hashSet = new HashSet(2);
                while (true) {
                    Tuple2 tuple2 = (Tuple2) arrayDeque.poll();
                    if (tuple2 == null) {
                        break;
                    }
                    Assert.assertThat(tuple2.f0, org.hamcrest.Matchers.equalTo(jobID));
                    hashSet.add(tuple2.f1);
                }
                Assert.assertThat(hashSet, org.hamcrest.Matchers.containsInAnyOrder(new AllocationID[]{createSlotRequest.getAllocationId(), createSlotRequest2.getAllocationId()}));
                createSlotManager.unregisterTaskManager(taskExecutorConnection2.getInstanceID());
                Assert.assertThat(arrayDeque, org.hamcrest.Matchers.hasSize(3));
                Map<JobID, List<Tuple2<JobID, AllocationID>>> map = (Map) arrayDeque.stream().collect(Collectors.groupingBy(tuple22 -> {
                    return (JobID) tuple22.f0;
                }));
                Assert.assertThat(map.entrySet(), org.hamcrest.Matchers.hasSize(2));
                Set<AllocationID> extractFailedAllocationsForJob = extractFailedAllocationsForJob(jobID2, map);
                Set<AllocationID> extractFailedAllocationsForJob2 = extractFailedAllocationsForJob(jobID3, map);
                Assert.assertThat(extractFailedAllocationsForJob, org.hamcrest.Matchers.containsInAnyOrder(new AllocationID[]{createSlotRequest3.getAllocationId(), createSlotRequest4.getAllocationId()}));
                Assert.assertThat(extractFailedAllocationsForJob2, org.hamcrest.Matchers.containsInAnyOrder(new AllocationID[]{createSlotRequest5.getAllocationId()}));
                if (createSlotManager != null) {
                    if (0 == 0) {
                        createSlotManager.close();
                        return;
                    }
                    try {
                        createSlotManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotManager != null) {
                if (th != null) {
                    try {
                        createSlotManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotManager.close();
                }
            }
            throw th4;
        }
    }

    private Set<AllocationID> extractFailedAllocationsForJob(JobID jobID, Map<JobID, List<Tuple2<JobID, AllocationID>>> map) {
        return (Set) map.get(jobID).stream().map(tuple2 -> {
            return (AllocationID) tuple2.f1;
        }).collect(Collectors.toSet());
    }

    @Nonnull
    private SlotReport createSlotReport(ResourceID resourceID, int i) {
        HashSet hashSet = new HashSet(i);
        for (int i2 = 0; i2 < i; i2++) {
            hashSet.add(new SlotStatus(new SlotID(resourceID, i2), ResourceProfile.UNKNOWN));
        }
        return new SlotReport(hashSet);
    }

    @Nonnull
    private SlotRequest createSlotRequest(JobID jobID) {
        return new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
    }

    private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceActions) {
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        slotManager.start(resourceManagerId, Executors.directExecutor(), resourceActions);
        return slotManager;
    }
}
