package org.apache.flink.runtime.jobmaster.slotpool;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.QuadConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.class */
public class DefaultDeclarativeSlotPoolTest extends TestLogger {
    private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7d).build();
    private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest$FreeSlotConsumer.class */
    public static class FreeSlotConsumer implements BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> {
        final BlockingQueue<AllocationID> freedSlots;

        private FreeSlotConsumer() {
            this.freedSlots = new ArrayBlockingQueue(10);
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<Acknowledge> apply(AllocationID allocationID, Throwable th) {
            this.freedSlots.offer(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<AllocationID> drainFreedSlots() {
            ArrayList arrayList = new ArrayList();
            this.freedSlots.drainTo(arrayList);
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest$NewResourceRequirementsService.class */
    private static final class NewResourceRequirementsService implements Consumer<Collection<ResourceRequirement>> {
        private final BlockingQueue<Collection<ResourceRequirement>> resourceRequirementsQueue;

        private NewResourceRequirementsService() {
            this.resourceRequirementsQueue = new ArrayBlockingQueue(2);
        }

        @Override // java.util.function.Consumer
        public void accept(Collection<ResourceRequirement> collection) {
            this.resourceRequirementsQueue.offer(collection);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<ResourceRequirement> takeResourceRequirements() throws InterruptedException {
            return this.resourceRequirementsQueue.take();
        }

        public boolean hasNextResourceRequirements() {
            return !this.resourceRequirementsQueue.isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest$NewSlotsService.class */
    public static final class NewSlotsService implements DeclarativeSlotPool.NewSlotsListener {
        private final BlockingQueue<Collection<? extends PhysicalSlot>> physicalSlotsQueue;

        private NewSlotsService() {
            this.physicalSlotsQueue = new ArrayBlockingQueue(2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<? extends PhysicalSlot> takeNewSlots() throws InterruptedException {
            return this.physicalSlotsQueue.take();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasNextNewSlots() {
            return !this.physicalSlotsQueue.isEmpty();
        }

        public void notifyNewSlotsAreAvailable(Collection<? extends PhysicalSlot> collection) {
            this.physicalSlotsQueue.offer(collection);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest$PhysicalSlotSlotOfferMatcher.class */
    private static class PhysicalSlotSlotOfferMatcher extends TypeSafeMatcher<PhysicalSlot> {
        private final SlotOffer slotOffer;

        public PhysicalSlotSlotOfferMatcher(SlotOffer slotOffer) {
            this.slotOffer = slotOffer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(PhysicalSlot physicalSlot) {
            return physicalSlot.getAllocationId().equals(this.slotOffer.getAllocationId()) && physicalSlot.getResourceProfile().equals(this.slotOffer.getResourceProfile()) && physicalSlot.getPhysicalSlotNumber() == this.slotOffer.getSlotIndex();
        }

        public void describeTo(Description description) {
            description.appendText("SlotOffer: ");
            description.appendValueList("{", ",", "}", new Serializable[]{this.slotOffer.getAllocationId(), this.slotOffer.getResourceProfile(), Integer.valueOf(this.slotOffer.getSlotIndex())});
        }
    }

    @Test
    public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService newResourceRequirementsService = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool = createDefaultDeclarativeSlotPool(newResourceRequirementsService);
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
        ResourceCounter createResourceRequirements = createResourceRequirements();
        createDefaultDeclarativeSlotPool.increaseResourceRequirementsBy(withResource);
        createDefaultDeclarativeSlotPool.increaseResourceRequirementsBy(createResourceRequirements);
        Assert.assertThat(newResourceRequirementsService.takeResourceRequirements(), CoreMatchers.is(toResourceRequirements(withResource)));
        Assert.assertThat(newResourceRequirementsService.takeResourceRequirements(), CoreMatchers.is(toResourceRequirements(withResource.add(createResourceRequirements))));
        Assert.assertThat(Boolean.valueOf(newResourceRequirementsService.hasNextResourceRequirements()), CoreMatchers.is(false));
    }

    @Test
    public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService newResourceRequirementsService = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool = createDefaultDeclarativeSlotPool(newResourceRequirementsService);
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
        createDefaultDeclarativeSlotPool.increaseResourceRequirementsBy(withResource);
        newResourceRequirementsService.takeResourceRequirements();
        ResourceCounter withResource2 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
        createDefaultDeclarativeSlotPool.decreaseResourceRequirementsBy(withResource2);
        Assert.assertThat(newResourceRequirementsService.takeResourceRequirements(), CoreMatchers.is(toResourceRequirements(withResource.subtract(withResource2))));
        Assert.assertThat(Boolean.valueOf(newResourceRequirementsService.hasNextResourceRequirements()), CoreMatchers.is(false));
    }

    @Test
    public void testGetResourceRequirements() {
        DefaultDeclarativeSlotPool build = DefaultDeclarativeSlotPoolBuilder.builder().build();
        Assert.assertThat(build.getResourceRequirements(), CoreMatchers.is(toResourceRequirements(ResourceCounter.empty())));
        ResourceCounter createResourceRequirements = createResourceRequirements();
        build.increaseResourceRequirementsBy(createResourceRequirements);
        Assert.assertThat(build.getResourceRequirements(), CoreMatchers.is(toResourceRequirements(createResourceRequirements)));
    }

    @Test
    public void testOfferSlots() throws InterruptedException {
        NewSlotsService newSlotsService = new NewSlotsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(newSlotsService);
        ResourceCounter createResourceRequirements = createResourceRequirements();
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.increaseResourceRequirementsBy(createResourceRequirements);
        Collection<SlotOffer> createSlotOffersForResourceRequirements = createSlotOffersForResourceRequirements(createResourceRequirements);
        Assert.assertThat(SlotPoolTestUtils.offerSlots(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createSlotOffersForResourceRequirements), Matchers.containsInAnyOrder(createSlotOffersForResourceRequirements.toArray()));
        Collection<PhysicalSlot> drainNewSlotService = drainNewSlotService(newSlotsService);
        Assert.assertThat(drainNewSlotService, Matchers.containsInAnyOrder((Collection) createSlotOffersForResourceRequirements.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
        Assert.assertThat(createDefaultDeclarativeSlotPoolWithNewSlotsListener.getAllSlotsInformation(), Matchers.containsInAnyOrder((Collection) drainNewSlotService.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
    }

    @Test
    public void testDuplicateSlotOfferings() throws InterruptedException {
        NewSlotsService newSlotsService = new NewSlotsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(newSlotsService);
        ResourceCounter createResourceRequirements = createResourceRequirements();
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.increaseResourceRequirementsBy(createResourceRequirements);
        Collection<SlotOffer> createSlotOffersForResourceRequirements = createSlotOffersForResourceRequirements(createResourceRequirements);
        SlotPoolTestUtils.offerSlots(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createSlotOffersForResourceRequirements);
        drainNewSlotService(newSlotsService);
        Assert.assertThat(SlotPoolTestUtils.offerSlots(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createSlotOffersForResourceRequirements), Matchers.containsInAnyOrder(createSlotOffersForResourceRequirements.toArray()));
        Assert.assertFalse(newSlotsService.hasNextNewSlots());
    }

    @Test
    public void testOfferingTooManySlots() {
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(new NewSlotsService());
        ResourceCounter createResourceRequirements = createResourceRequirements();
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.increaseResourceRequirementsBy(createResourceRequirements);
        Map map = (Map) SlotPoolTestUtils.offerSlots(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createSlotOffersForResourceRequirements(createResourceRequirements.add(RESOURCE_PROFILE_1, 2))).stream().map((v0) -> {
            return v0.getResourceProfile();
        }).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Iterator it = createResourceRequirements.getResourcesWithCount().iterator();
        while (it.hasNext()) {
            Assert.assertThat(map.getOrDefault(((Map.Entry) it.next()).getKey(), 0L), CoreMatchers.is(Long.valueOf(((Integer) r0.getValue()).intValue())));
        }
    }

    @Test
    public void testReleaseSlotsRemovesSlots() throws InterruptedException {
        NewResourceRequirementsService newResourceRequirementsService = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool = createDefaultDeclarativeSlotPool(newResourceRequirementsService);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        increaseRequirementsAndOfferSlotsToSlotPool(createDefaultDeclarativeSlotPool, createResourceRequirements(), localTaskManagerLocation);
        newResourceRequirementsService.takeResourceRequirements();
        createDefaultDeclarativeSlotPool.releaseSlots(localTaskManagerLocation.getResourceID(), new FlinkException("Test failure"));
        Assert.assertThat(createDefaultDeclarativeSlotPool.getAllSlotsInformation(), CoreMatchers.is(Matchers.empty()));
    }

    @Test
    public void testReleaseSlotsReturnsSlot() {
        DefaultDeclarativeSlotPool build = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter createResourceRequirements = createResourceRequirements();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool = increaseRequirementsAndOfferSlotsToSlotPool(build, createResourceRequirements, localTaskManagerLocation, new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway());
        build.releaseSlots(localTaskManagerLocation.getResourceID(), new FlinkException("Test failure"));
        Assert.assertThat(freeSlotConsumer.drainFreedSlots(), Matchers.containsInAnyOrder(increaseRequirementsAndOfferSlotsToSlotPool.stream().map((v0) -> {
            return v0.getAllocationId();
        }).toArray()));
    }

    @Test
    public void testReleaseSlotsOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((defaultDeclarativeSlotPool, slotOffer, slotOffer2, taskManagerLocation) -> {
            defaultDeclarativeSlotPool.reserveFreeSlot(slotOffer2.getAllocationId(), slotOffer2.getResourceProfile()).tryAssignPayload(new TestingPhysicalSlotPayload());
            ResourceCounter releaseSlots = defaultDeclarativeSlotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure"));
            Assert.assertThat(Integer.valueOf(releaseSlots.getResourceCount(slotOffer.getResourceProfile())), CoreMatchers.is(0));
            Assert.assertThat(Integer.valueOf(releaseSlots.getResourceCount(slotOffer2.getResourceProfile())), CoreMatchers.is(1));
        });
    }

    @Test
    public void testReleaseSlotOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((defaultDeclarativeSlotPool, slotOffer, slotOffer2, taskManagerLocation) -> {
            defaultDeclarativeSlotPool.reserveFreeSlot(slotOffer2.getAllocationId(), slotOffer2.getResourceProfile()).tryAssignPayload(new TestingPhysicalSlotPayload());
            ResourceCounter releaseSlot = defaultDeclarativeSlotPool.releaseSlot(slotOffer.getAllocationId(), new FlinkException("Test failure"));
            ResourceCounter releaseSlot2 = defaultDeclarativeSlotPool.releaseSlot(slotOffer2.getAllocationId(), new FlinkException("Test failure"));
            Assert.assertThat(releaseSlot.getResources(), CoreMatchers.is(Matchers.empty()));
            Assert.assertThat(Integer.valueOf(releaseSlot2.getResourceCount(slotOffer2.getResourceProfile())), CoreMatchers.is(1));
        });
    }

    private static void withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation> quadConsumer) {
        DefaultDeclarativeSlotPool build = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter add = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1).add(RESOURCE_PROFILE_2, 1);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        Iterator<SlotOffer> it = increaseRequirementsAndOfferSlotsToSlotPool(build, add, localTaskManagerLocation, new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(new FreeSlotConsumer()).createTestingTaskExecutorGateway()).iterator();
        quadConsumer.accept(build, it.next(), it.next(), localTaskManagerLocation);
    }

    @Test
    public void testReleaseSlotDecreasesFulfilledResourceRequirements() throws InterruptedException {
        NewSlotsService newSlotsService = new NewSlotsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(newSlotsService);
        ResourceCounter createResourceRequirements = createResourceRequirements();
        increaseRequirementsAndOfferSlotsToSlotPool(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createResourceRequirements, null);
        PhysicalSlot physicalSlot = (PhysicalSlot) newSlotsService.takeNewSlots().iterator().next();
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure"));
        Assert.assertThat(createDefaultDeclarativeSlotPoolWithNewSlotsListener.getFulfilledResourceRequirements(), CoreMatchers.is(createResourceRequirements.subtract(physicalSlot.getResourceProfile(), 1)));
    }

    @Test
    public void testReleaseSlotReturnsSlot() throws InterruptedException {
        NewSlotsService newSlotsService = new NewSlotsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(newSlotsService);
        ResourceCounter createResourceRequirements = createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        increaseRequirementsAndOfferSlotsToSlotPool(createDefaultDeclarativeSlotPoolWithNewSlotsListener, createResourceRequirements, new LocalTaskManagerLocation(), new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway());
        PhysicalSlot physicalSlot = (PhysicalSlot) newSlotsService.takeNewSlots().iterator().next();
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure"));
        Assert.assertThat((AllocationID) Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()), CoreMatchers.is(physicalSlot.getAllocationId()));
    }

    @Test
    public void testReturnIdleSlotsAfterTimeout() {
        Time seconds = Time.seconds(10L);
        DefaultDeclarativeSlotPool build = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(seconds).build();
        ResourceCounter createResourceRequirements = createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool = increaseRequirementsAndOfferSlotsToSlotPool(build, createResourceRequirements, new LocalTaskManagerLocation(), new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway());
        build.decreaseResourceRequirementsBy(createResourceRequirements);
        build.releaseIdleSlots(0 + seconds.toMilliseconds());
        Collection drainFreedSlots = freeSlotConsumer.drainFreedSlots();
        Assert.assertThat(increaseRequirementsAndOfferSlotsToSlotPool, CoreMatchers.is(CoreMatchers.not(Matchers.empty())));
        Assert.assertThat(drainFreedSlots, Matchers.containsInAnyOrder(increaseRequirementsAndOfferSlotsToSlotPool.stream().map((v0) -> {
            return v0.getAllocationId();
        }).toArray()));
        assertNoAvailableAndRequiredResources(build);
    }

    private void assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool defaultDeclarativeSlotPool) {
        Assert.assertTrue(defaultDeclarativeSlotPool.getFulfilledResourceRequirements().isEmpty());
        Assert.assertTrue(defaultDeclarativeSlotPool.getResourceRequirements().isEmpty());
        Assert.assertThat(defaultDeclarativeSlotPool.getAllSlotsInformation(), CoreMatchers.is(Matchers.empty()));
    }

    @Test
    public void testOnlyReturnExcessIdleSlots() {
        Time seconds = Time.seconds(10L);
        DefaultDeclarativeSlotPool build = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(seconds).build();
        ResourceCounter createResourceRequirements = createResourceRequirements();
        Collection<SlotOffer> createSlotOffersForResourceRequirements = createSlotOffersForResourceRequirements(createResourceRequirements);
        build.increaseResourceRequirementsBy(createResourceRequirements);
        Collection<SlotOffer> offerSlots = SlotPoolTestUtils.offerSlots(build, createSlotOffersForResourceRequirements);
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
        build.decreaseResourceRequirementsBy(createResourceRequirements.subtract(withResource));
        build.releaseIdleSlots(0 + seconds.toMilliseconds());
        Assert.assertThat(offerSlots, CoreMatchers.is(CoreMatchers.not(Matchers.empty())));
        Assert.assertThat(build.getFulfilledResourceRequirements(), CoreMatchers.is(withResource));
    }

    @Test
    public void testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirementsOfSameProfile() throws InterruptedException {
        NewSlotsService newSlotsService = new NewSlotsService();
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener = createDefaultDeclarativeSlotPoolWithNewSlotsListener(newSlotsService);
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
        increaseRequirementsAndOfferSlotsToSlotPool(createDefaultDeclarativeSlotPoolWithNewSlotsListener, withResource, null);
        PhysicalSlot physicalSlot = (PhysicalSlot) Iterables.getOnlyElement(drainNewSlotService(newSlotsService));
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.reserveFreeSlot(physicalSlot.getAllocationId(), RESOURCE_PROFILE_1);
        createDefaultDeclarativeSlotPoolWithNewSlotsListener.freeReservedSlot(physicalSlot.getAllocationId(), (Throwable) null, 0L);
        Assert.assertThat(Iterables.getOnlyElement(drainNewSlotService(newSlotsService)), CoreMatchers.sameInstance(physicalSlot));
        Assert.assertThat(createDefaultDeclarativeSlotPoolWithNewSlotsListener.offerSlots(createSlotOffersForResourceRequirements(withResource), new LocalTaskManagerLocation(), SlotPoolTestUtils.createTaskManagerGateway(null), 0L), CoreMatchers.is(Matchers.empty()));
        Assert.assertTrue(createDefaultDeclarativeSlotPoolWithNewSlotsListener.calculateUnfulfilledResources().isEmpty());
    }

    @Test
    public void testFreedSlotWillRemainAssignedToMatchedResourceProfile() {
        DefaultDeclarativeSlotPool build = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile build3 = ResourceProfile.newBuilder().setManagedMemoryMB(512).build();
        build.increaseResourceRequirementsBy(ResourceCounter.withResource(build2, 1));
        SlotPoolTestUtils.offerSlots(build, createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.ANY, 1)));
        SlotInfoWithUtilization slotInfoWithUtilization = (SlotInfoWithUtilization) build.getFreeSlotsInformation().iterator().next();
        build.reserveFreeSlot(slotInfoWithUtilization.getAllocationId(), build2);
        Assert.assertThat(Integer.valueOf(build.getFulfilledResourceRequirements().getResourceCount(build2)), CoreMatchers.is(1));
        build.increaseResourceRequirementsBy(ResourceCounter.withResource(build3, 1));
        build.decreaseResourceRequirementsBy(ResourceCounter.withResource(build2, 1));
        build.freeReservedSlot(slotInfoWithUtilization.getAllocationId(), (Throwable) null, 1L);
        Assert.assertThat(Integer.valueOf(build.getFulfilledResourceRequirements().getResourceCount(build2)), CoreMatchers.is(1));
        Assert.assertThat(Integer.valueOf(build.getFulfilledResourceRequirements().getResourceCount(build3)), CoreMatchers.is(0));
    }

    @Test
    public void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() {
        DefaultDeclarativeSlotPool build = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
        build.increaseResourceRequirementsBy(ResourceCounter.withResource(build2, 1));
        SlotPoolTestUtils.offerSlots(build, createSlotOffersForResourceRequirements(ResourceCounter.withResource(build2, 1)));
        build.increaseResourceRequirementsBy(ResourceCounter.withResource(resourceProfile, 1));
        build.reserveFreeSlot(((SlotInfoWithUtilization) build.getFreeSlotsInformation().stream().filter(slotInfoWithUtilization -> {
            return slotInfoWithUtilization.getResourceProfile().equals(build2);
        }).findFirst().get()).getAllocationId(), resourceProfile);
        ResourceCounter fulfilledResourceRequirements = build.getFulfilledResourceRequirements();
        Assert.assertThat(Integer.valueOf(fulfilledResourceRequirements.getResourceCount(resourceProfile)), CoreMatchers.is(1));
        Assert.assertThat(Integer.valueOf(fulfilledResourceRequirements.getResourceCount(build2)), CoreMatchers.is(0));
        Assert.assertThat(build.getResourceRequirements(), CoreMatchers.hasItems(new ResourceRequirement[]{ResourceRequirement.create(build2, 2)}));
    }

    @Test
    public void testSetResourceRequirementsForInitialResourceRequirements() {
        DefaultDeclarativeSlotPool build = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
        build.setResourceRequirements(withResource);
        Assert.assertThat(build.getResourceRequirements(), CoreMatchers.is(toResourceRequirements(withResource)));
    }

    @Test
    public void testSetResourceRequirementsOverwritesPreviousValue() {
        DefaultDeclarativeSlotPool build = new DefaultDeclarativeSlotPoolBuilder().build();
        build.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1, 1));
        ResourceCounter withResource = ResourceCounter.withResource(RESOURCE_PROFILE_2, 1);
        build.setResourceRequirements(withResource);
        Assert.assertThat(build.getResourceRequirements(), CoreMatchers.is(toResourceRequirements(withResource)));
    }

    @Nonnull
    private static ResourceCounter createResourceRequirements() {
        HashMap hashMap = new HashMap();
        hashMap.put(RESOURCE_PROFILE_1, 2);
        hashMap.put(RESOURCE_PROFILE_2, 1);
        return ResourceCounter.withResources(hashMap);
    }

    @Nonnull
    public static Collection<SlotOffer> createSlotOffersForResourceRequirements(ResourceCounter resourceCounter) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (Map.Entry entry : resourceCounter.getResourcesWithCount()) {
            for (int i2 = 0; i2 < ((Integer) entry.getValue()).intValue(); i2++) {
                ResourceProfile resourceProfile = (ResourceProfile) entry.getKey();
                int i3 = i;
                i++;
                arrayList.add(new SlotOffer(new AllocationID(), i3, resourceProfile == ResourceProfile.UNKNOWN ? ResourceProfile.ANY : resourceProfile));
            }
        }
        return arrayList;
    }

    @Nonnull
    private static Collection<ResourceRequirement> toResourceRequirements(ResourceCounter resourceCounter) {
        return (Collection) resourceCounter.getResourcesWithCount().stream().map(entry -> {
            return ResourceRequirement.create((ResourceProfile) entry.getKey(), ((Integer) entry.getValue()).intValue());
        }).collect(Collectors.toList());
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool(NewResourceRequirementsService newResourceRequirementsService) {
        return DefaultDeclarativeSlotPoolBuilder.builder().setNotifyNewResourceRequirements(newResourceRequirementsService).build();
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool = createDefaultDeclarativeSlotPool();
        createDefaultDeclarativeSlotPool.registerNewSlotsListener(newSlotsListener);
        return createDefaultDeclarativeSlotPool;
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() {
        return DefaultDeclarativeSlotPoolBuilder.builder().build();
    }

    @Nonnull
    private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool defaultDeclarativeSlotPool, ResourceCounter resourceCounter, @Nullable LocalTaskManagerLocation localTaskManagerLocation) {
        return increaseRequirementsAndOfferSlotsToSlotPool(defaultDeclarativeSlotPool, resourceCounter, localTaskManagerLocation, null);
    }

    @Nonnull
    private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool defaultDeclarativeSlotPool, ResourceCounter resourceCounter, @Nullable LocalTaskManagerLocation localTaskManagerLocation, @Nullable TaskExecutorGateway taskExecutorGateway) {
        Collection<SlotOffer> createSlotOffersForResourceRequirements = createSlotOffersForResourceRequirements(resourceCounter);
        defaultDeclarativeSlotPool.increaseResourceRequirementsBy(resourceCounter);
        return defaultDeclarativeSlotPool.offerSlots(createSlotOffersForResourceRequirements, localTaskManagerLocation == null ? new LocalTaskManagerLocation() : localTaskManagerLocation, SlotPoolTestUtils.createTaskManagerGateway(taskExecutorGateway), 0L);
    }

    @Nonnull
    private static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService newSlotsService) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        while (newSlotsService.hasNextNewSlots()) {
            arrayList.addAll(newSlotsService.takeNewSlots());
        }
        return arrayList;
    }

    private static TypeSafeMatcher<PhysicalSlot> matchesSlotOffer(SlotOffer slotOffer) {
        return new PhysicalSlotSlotOfferMatcher(slotOffer);
    }
}
