package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.class */
public class SchedulerSlotSharingTest {
    @Test
    public void scheduleSingleVertexType() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 8), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 8), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 8), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 8), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately, scheduleImmediately2, scheduleImmediately3, scheduleImmediately4));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 8), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e2) {
            }
            scheduleImmediately3.releaseSlot();
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 8), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately5);
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately4.releaseSlot();
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 5, 8), slotSharingGroup));
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 6, 8), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 7, 8), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately6);
            Assert.assertNotNull(scheduleImmediately7);
            Assert.assertNotNull(scheduleImmediately8);
            Assert.assertEquals(0L, 0 + (scheduleImmediately5.getInstance() == randomInstance ? 1 : -1) + (scheduleImmediately6.getInstance() == randomInstance ? 1 : -1) + (scheduleImmediately7.getInstance() == randomInstance ? 1 : -1) + (scheduleImmediately8.getInstance() == randomInstance ? 1 : -1));
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately8.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(8L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void scheduleImmediatelyWithSharing() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 5), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately, scheduleImmediately2, scheduleImmediately3, scheduleImmediately4));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e2) {
            }
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 5), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately5);
            Assert.assertNotNull(scheduleImmediately6);
            Assert.assertNotNull(scheduleImmediately7);
            Assert.assertNotNull(scheduleImmediately8);
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e3) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e4) {
            }
            scheduleImmediately.releaseSlot();
            scheduleImmediately4.releaseSlot();
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e5) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e6) {
            }
            SimpleSlot scheduleImmediately9 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately9);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            scheduleImmediately6.releaseSlot();
            SimpleSlot scheduleImmediately10 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately10);
            scheduleImmediately2.releaseSlot();
            scheduleImmediately3.releaseSlot();
            scheduleImmediately9.releaseSlot();
            scheduleImmediately5.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately8.releaseSlot();
            scheduleImmediately10.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(10L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e7) {
            e7.printStackTrace();
            Assert.fail(e7.getMessage());
        }
    }

    @Test
    public void scheduleImmediatelyWithIntermediateTotallyEmptySharingGroup() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup));
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup));
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately8.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(8L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately, scheduleImmediately2, scheduleImmediately3, scheduleImmediately4));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 7), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 7), slotSharingGroup));
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 7), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 7), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately5);
            Assert.assertNotNull(scheduleImmediately6);
            Assert.assertNotNull(scheduleImmediately7);
            Assert.assertNotNull(scheduleImmediately8);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately5, scheduleImmediately6, scheduleImmediately7, scheduleImmediately8));
            SimpleSlot scheduleImmediately9 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately10 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately11 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately12 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 3, 4), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately9);
            Assert.assertNotNull(scheduleImmediately10);
            Assert.assertNotNull(scheduleImmediately11);
            Assert.assertNotNull(scheduleImmediately12);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately9, scheduleImmediately10, scheduleImmediately11, scheduleImmediately12));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e) {
            } catch (Exception e2) {
                Assert.fail("Wrong exception.");
            }
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately8.releaseSlot();
            SimpleSlot scheduleImmediately13 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 5, 7), slotSharingGroup));
            SimpleSlot scheduleImmediately14 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 6, 7), slotSharingGroup));
            SimpleSlot scheduleImmediately15 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 7, 7), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately13);
            Assert.assertNotNull(scheduleImmediately14);
            Assert.assertNotNull(scheduleImmediately15);
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            scheduleImmediately13.releaseSlot();
            scheduleImmediately14.releaseSlot();
            scheduleImmediately15.releaseSlot();
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduleImmediately9.releaseSlot();
            scheduleImmediately10.releaseSlot();
            scheduleImmediately11.releaseSlot();
            scheduleImmediately12.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(15L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup2() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 2), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 2), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 2), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            scheduleImmediately2.releaseSlot();
            scheduleImmediately3.releaseSlot();
            Assert.assertNotNull(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1))));
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(4L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void scheduleMixedSharingAndNonSharing() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            JobVertexID jobVertexID5 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(3));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 2)));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 2)));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertNotNull(scheduleImmediately5);
            Assert.assertNotNull(scheduleImmediately6);
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 3)));
            Assert.assertNotNull(scheduleImmediately7);
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e2) {
            }
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e3) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e4) {
            }
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 3)));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e5) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e6) {
            }
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 0, 1)));
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e7) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e8) {
            }
            scheduleImmediately.releaseSlot();
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately9 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately8);
            Assert.assertNotNull(scheduleImmediately9);
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            scheduleImmediately5.releaseSlot();
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot scheduleImmediately10 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 3)));
            Assert.assertNotNull(scheduleImmediately10);
            scheduleImmediately6.releaseSlot();
            SimpleSlot scheduleImmediately11 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 3)));
            Assert.assertNotNull(scheduleImmediately11);
            scheduleImmediately2.releaseSlot();
            SimpleSlot scheduleImmediately12 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately13 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately12);
            Assert.assertNotNull(scheduleImmediately13);
            scheduleImmediately8.releaseSlot();
            scheduleImmediately9.releaseSlot();
            scheduleImmediately12.releaseSlot();
            scheduleImmediately13.releaseSlot();
            SimpleSlot scheduleImmediately14 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 1, 2)));
            SimpleSlot scheduleImmediately15 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 0, 2)));
            Assert.assertNotNull(scheduleImmediately14);
            Assert.assertNotNull(scheduleImmediately15);
            scheduleImmediately10.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately11.releaseSlot();
            scheduleImmediately14.releaseSlot();
            scheduleImmediately15.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(15L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e9) {
            e9.printStackTrace();
            Assert.fail(e9.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment1() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, randomInstance2), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(1L, randomInstance2.getNumberOfAvailableSlots());
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, randomInstance2), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(1L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment2() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, randomInstance), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(2L, randomInstance2.getNumberOfAvailableSlots());
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, randomInstance2), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, randomInstance2), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment3() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 4, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 4, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 3, 4, randomInstance), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 4, 4, randomInstance), slotSharingGroup));
            Assert.assertNotNull(scheduleImmediately);
            Assert.assertNotNull(scheduleImmediately2);
            Assert.assertNotNull(scheduleImmediately3);
            Assert.assertNotNull(scheduleImmediately4);
            Assert.assertNotNull(scheduleImmediately5);
            Assert.assertNotNull(scheduleImmediately6);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(randomInstance, scheduleImmediately.getInstance());
            Assert.assertEquals(randomInstance, scheduleImmediately2.getInstance());
            Assert.assertEquals(randomInstance, scheduleImmediately3.getInstance());
            Assert.assertEquals(randomInstance, scheduleImmediately4.getInstance());
            Assert.assertEquals(randomInstance2, scheduleImmediately5.getInstance());
            Assert.assertEquals(randomInstance2, scheduleImmediately6.getInstance());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(2L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSequentialAllocateAndRelease() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately9 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup));
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            scheduleImmediately8.releaseSlot();
            scheduleImmediately9.releaseSlot();
            SimpleSlot scheduleImmediately10 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately11 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately12 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately13 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 3, 4), slotSharingGroup));
            scheduleImmediately5.releaseSlot();
            scheduleImmediately10.releaseSlot();
            scheduleImmediately11.releaseSlot();
            scheduleImmediately12.releaseSlot();
            scheduleImmediately13.releaseSlot();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testConcurrentAllocateAndRelease() {
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 50; i++) {
            try {
                final JobVertexID jobVertexID = new JobVertexID();
                final JobVertexID jobVertexID2 = new JobVertexID();
                final JobVertexID jobVertexID3 = new JobVertexID();
                final JobVertexID jobVertexID4 = new JobVertexID();
                final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
                final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
                final AtomicInteger atomicInteger = new AtomicInteger();
                final AtomicInteger atomicInteger2 = new AtomicInteger();
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                final AtomicInteger atomicInteger3 = new AtomicInteger();
                final Random random = new Random();
                final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                final AtomicInteger atomicInteger4 = new AtomicInteger();
                final Runnable runnable = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, atomicInteger3.getAndIncrement(), 4), slotSharingGroup));
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            scheduleImmediately.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable2 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup));
                                CommonTestUtils.sleepUninterruptibly(5L);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                scheduleImmediately.releaseSlot();
                                if (atomicInteger4.incrementAndGet() == 13) {
                                    synchronized (atomicInteger4) {
                                        atomicInteger4.notifyAll();
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable3 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, atomicInteger2.getAndIncrement(), 4), slotSharingGroup));
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            newFixedThreadPool.execute(runnable2);
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            scheduleImmediately.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable4 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.4
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, atomicInteger.getAndIncrement(), 4), slotSharingGroup));
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            newFixedThreadPool.execute(runnable3);
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            scheduleImmediately.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                Runnable runnable5 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.5
                    @Override // java.lang.Runnable
                    public void run() {
                        CommonTestUtils.sleepUninterruptibly(random.nextInt(10));
                        newFixedThreadPool.execute(runnable4);
                    }
                };
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                synchronized (atomicInteger4) {
                    while (!atomicBoolean2.get() && atomicInteger4.get() < 13) {
                        atomicInteger4.wait(1000L);
                    }
                }
                Assert.assertFalse("Thread failed", atomicBoolean2.get());
                while (scheduler.getNumberOfAvailableSlots() < 4) {
                    CommonTestUtils.sleepUninterruptibly(5L);
                }
                Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
                Assert.assertEquals(1L, scheduler.getNumberOfInstancesWithAvailableSlots());
                Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
                Assert.assertEquals(13L, scheduler.getNumberOfUnconstrainedAssignments());
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                return;
            }
        }
    }

    @Test
    public void testDopIncreases() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
            SimpleSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 1), slotSharingGroup));
            SimpleSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 1), slotSharingGroup));
            Assert.assertTrue(scheduleImmediately.getParent() == scheduleImmediately2.getParent());
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 4), slotSharingGroup));
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            SimpleSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 2, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately8 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 3, 5), slotSharingGroup));
            SimpleSlot scheduleImmediately9 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 4), slotSharingGroup));
            SimpleSlot scheduleImmediately10 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 3, 4), slotSharingGroup));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 4, 5), slotSharingGroup));
                Assert.fail("should throw an exception");
            } catch (NoResourceAvailableException e) {
            }
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            scheduleImmediately7.releaseSlot();
            scheduleImmediately8.releaseSlot();
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately9.releaseSlot();
            scheduleImmediately10.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
