package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.class */
public class ScheduleWithCoLocationHintTest extends TestLogger {
    @Test
    public void scheduleAllSharedAndCoLocated() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            Assert.assertEquals(6L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint3 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint4 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint5 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint6 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 6), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 6), slotSharingGroup, coLocationConstraint2), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 6), slotSharingGroup, coLocationConstraint3), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 6), slotSharingGroup, coLocationConstraint4), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 6), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 6), slotSharingGroup, coLocationConstraint2), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 6), slotSharingGroup, coLocationConstraint3), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 6), slotSharingGroup, coLocationConstraint5), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 5, 6), slotSharingGroup, coLocationConstraint6), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 6), slotSharingGroup, coLocationConstraint4), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot11 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 6), slotSharingGroup, coLocationConstraint5), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot12 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 5, 6), slotSharingGroup, coLocationConstraint6), false, Collections.emptyList()).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertNotNull(simpleSlot5);
            Assert.assertNotNull(simpleSlot6);
            Assert.assertNotNull(simpleSlot7);
            Assert.assertNotNull(simpleSlot8);
            Assert.assertNotNull(simpleSlot9);
            Assert.assertNotNull(simpleSlot10);
            Assert.assertNotNull(simpleSlot11);
            Assert.assertNotNull(simpleSlot12);
            Assert.assertEquals(2L, simpleSlot.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot2.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot3.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot4.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot5.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot6.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot7.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot8.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot9.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot10.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot11.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot12.getRoot().getNumberLeaves());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot5.getTaskManagerID());
            Assert.assertEquals(simpleSlot2.getTaskManagerID(), simpleSlot6.getTaskManagerID());
            Assert.assertEquals(simpleSlot3.getTaskManagerID(), simpleSlot7.getTaskManagerID());
            Assert.assertEquals(simpleSlot4.getTaskManagerID(), simpleSlot10.getTaskManagerID());
            Assert.assertEquals(simpleSlot8.getTaskManagerID(), simpleSlot11.getTaskManagerID());
            Assert.assertEquals(simpleSlot9.getTaskManagerID(), simpleSlot12.getTaskManagerID());
            Assert.assertEquals(coLocationConstraint.getLocation(), simpleSlot.getTaskManagerLocation());
            Assert.assertEquals(coLocationConstraint2.getLocation(), simpleSlot2.getTaskManagerLocation());
            Assert.assertEquals(coLocationConstraint3.getLocation(), simpleSlot3.getTaskManagerLocation());
            Assert.assertEquals(coLocationConstraint4.getLocation(), simpleSlot4.getTaskManagerLocation());
            Assert.assertEquals(coLocationConstraint5.getLocation(), simpleSlot8.getTaskManagerLocation());
            Assert.assertEquals(coLocationConstraint6.getLocation(), simpleSlot9.getTaskManagerLocation());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(6L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(6L, scheduler.getNumberOfUnconstrainedAssignments());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot10.releaseSlot();
            simpleSlot11.releaseSlot();
            simpleSlot12.releaseSlot();
            Assert.assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
            Assert.assertNotNull((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            simpleSlot9.releaseSlot();
            simpleSlot11.releaseSlot();
            simpleSlot12.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(6L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(7L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void scheduleWithIntermediateRelease() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(new CoLocationGroup());
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 1), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 1), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 1)), false, Collections.emptyList()).get();
            ResourceID taskManagerID = simpleSlot.getTaskManagerID();
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            Assert.assertEquals(taskManagerID, ((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get()).getTaskManagerID());
            Assert.assertEquals(2L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(2L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void scheduleWithReleaseNoResource() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(new CoLocationGroup());
            ((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 1), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get()).releaseSlot();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 1)), false, Collections.emptyList()).get();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 2)), false, Collections.emptyList()).get();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
                Assert.fail("Scheduled even though no resource was available.");
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof NoResourceAvailableException);
            }
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(3L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void scheduleMixedCoLocationSlotSharing() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint3 = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint4 = new CoLocationConstraint(coLocationGroup);
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup), false, Collections.emptyList());
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup, coLocationConstraint2), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup, coLocationConstraint3), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup, coLocationConstraint4), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 4), slotSharingGroup, coLocationConstraint2), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 2, 4), slotSharingGroup, coLocationConstraint3), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 3, 4), slotSharingGroup, coLocationConstraint4), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 4), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 4), slotSharingGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 3, 4), slotSharingGroup), false, Collections.emptyList());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot8.getTaskManagerID());
            Assert.assertEquals(simpleSlot2.getTaskManagerID(), simpleSlot5.getTaskManagerID());
            Assert.assertEquals(simpleSlot3.getTaskManagerID(), simpleSlot6.getTaskManagerID());
            Assert.assertEquals(simpleSlot4.getTaskManagerID(), simpleSlot7.getTaskManagerID());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(12L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testGetsNonLocalFromSharingGroupFirst() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation2), slotSharingGroup), false, Collections.singleton(taskManagerLocation2)).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID3, 0, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation2)).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID3, 1, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation)).get();
            Assert.assertEquals(3L, simpleSlot.getRoot().getNumberLeaves());
            Assert.assertEquals(3L, simpleSlot2.getRoot().getNumberLeaves());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot3.getTaskManagerID());
            Assert.assertEquals(simpleSlot2.getTaskManagerID(), simpleSlot4.getTaskManagerID());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot5.getTaskManagerID());
            Assert.assertEquals(simpleSlot2.getTaskManagerID(), simpleSlot6.getTaskManagerID());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(5L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(1L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlotReleasedInBetween() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation2)).get();
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation2)).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation)).get();
            Assert.assertEquals(randomInstance.getTaskManagerID(), simpleSlot3.getTaskManagerID());
            Assert.assertEquals(randomInstance2.getTaskManagerID(), simpleSlot4.getTaskManagerID());
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlotReleasedInBetweenAndNoNewLocal() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation2)).get();
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID3, 0, 2, new TaskManagerLocation[0])), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID3, 1, 2, new TaskManagerLocation[0])), false, Collections.emptyList()).get();
            try {
                try {
                    scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation2)).get();
                    Assert.fail("should not be able to find a resource");
                } catch (Exception e) {
                    Assert.fail("wrong exception");
                }
            } catch (ExecutionException e2) {
                Assert.assertTrue(e2.getCause() instanceof NoResourceAvailableException);
            }
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(2L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(2L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testScheduleOutOfOrder() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.singleton(taskManagerLocation)).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint2), false, Collections.singleton(taskManagerLocation)).get();
            Assert.assertEquals(2L, simpleSlot.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot2.getRoot().getNumberLeaves());
            Assert.assertEquals(simpleSlot.getTaskManagerID(), simpleSlot3.getTaskManagerID());
            Assert.assertEquals(simpleSlot2.getTaskManagerID(), simpleSlot4.getTaskManagerID());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(3L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(1L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void nonColocationFollowsCoLocation() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(1);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
            CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup, coLocationConstraint), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation2), slotSharingGroup, coLocationConstraint2), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation), slotSharingGroup), false, Collections.emptyList()).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation), slotSharingGroup), false, Collections.emptyList()).get();
            Assert.assertEquals(2L, simpleSlot.getRoot().getNumberLeaves());
            Assert.assertEquals(2L, simpleSlot2.getRoot().getNumberLeaves());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
