package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.class */
public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase {
    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerPreemptionTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.conf.setBoolean("yarn.resourcemanager.monitor.capacity.preemption.select_based_on_reserved_containers", true);
    }

    @Test(timeout = 60000)
    public void testSimpleSurgicalPreemption() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 20480);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 20480);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(mockRM.submitApp(1024, "app", "user", (Map<ApplicationAccessType, String>) null, "a"), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 32, new ArrayList());
        for (int i = 0; i < 32; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(33L, applicationAttempt.getLiveContainers().size());
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 17);
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode2.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 16);
        MockAM launchAndRegisterAM2 = MockRM.launchAndRegisterAM(mockRM.submitApp(1024, "app", "user", (Map<ApplicationAccessType, String>) null, "c"), mockRM, registerNode);
        Assert.assertEquals(2048L, resourceScheduler.getNode(registerNode.getNodeId()).getAvailableResource().getMemorySize());
        Assert.assertEquals(4096L, resourceScheduler.getNode(registerNode2.getNodeId()).getAvailableResource().getMemorySize());
        launchAndRegisterAM2.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(6144), 1)), null);
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        SchedulingEditPolicy schedulingEditPolicy = getSchedulingEditPolicy(mockRM);
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        waitNumberOfLiveContainersFromApp(applicationAttempt, 29);
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 13);
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode2.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 16);
        Assert.assertEquals("Number of preempted containers incorrectly recorded:", 4L, resourceScheduler.getQueue("root").getMetrics().getAggregatePreemptedContainers());
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testSurgicalPreemptionWithAvailableResource() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 20480);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 20480);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(mockRM.submitApp(1024, "app", "user", (Map<ApplicationAccessType, String>) null, "a"), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 38, new ArrayList());
        for (int i = 0; i < 38; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(39L, applicationAttempt.getLiveContainers().size());
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 20);
        waitNumberOfLiveContainersOnNodeFromApp(resourceScheduler.getNode(rMNode2.getNodeID()), launchAndRegisterAM.getApplicationAttemptId(), 19);
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(ApplicationAttemptId.newInstance(mockRM.submitApp(4096, "app", "user", (Map<ApplicationAccessType, String>) null, "c").getApplicationId(), 1));
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = getSchedulingEditPolicy(mockRM);
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(3L, schedulingEditPolicy.getToPreemptContainers().size());
        schedulingEditPolicy.editSchedule();
        waitNumberOfLiveContainersFromApp(applicationAttempt, 36);
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        waitNumberOfReservedContainersFromApp(applicationAttempt2, 1);
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        int i2 = 0;
        while (applicationAttempt2.getLiveContainers().size() != 1 && i2 < 10) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
            i2++;
            Thread.sleep(100L);
        }
        waitNumberOfReservedContainersFromApp(applicationAttempt2, 0);
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testPreemptionForFragmentatedCluster() throws Exception {
        this.conf.setBoolean("yarn.resourcemanager.monitor.capacity.preemption.additional_res_balance_based_on_reserved_containers", true);
        CapacitySchedulerConfiguration capacitySchedulerConfiguration = new CapacitySchedulerConfiguration(this.conf);
        capacitySchedulerConfiguration.setLong("yarn.scheduler.maximum-allocation-mb", 21504L);
        capacitySchedulerConfiguration.setQueues("root", new String[]{"a", "b"});
        capacitySchedulerConfiguration.setCapacity("root.a", 50.0f);
        capacitySchedulerConfiguration.setUserLimitFactor("root.a", 100.0f);
        capacitySchedulerConfiguration.setCapacity("root.b", 50.0f);
        capacitySchedulerConfiguration.setUserLimitFactor("root.b", 100.0f);
        MockRM mockRM = new MockRM(capacitySchedulerConfiguration);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(mockRM.registerNode("h" + i + ":1234", 30720));
        }
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        MockRM.launchAndRegisterAM(mockRM.submitApp(3072, "app", "user", (Map<ApplicationAccessType, String>) null, "a"), mockRM, (MockNM) arrayList.get(0)).allocate("*", 21504, 4, new ArrayList());
        for (int i2 = 0; i2 < 10; i2++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent((RMNode) resourceScheduler.getRMContext().getRMNodes().get(((MockNM) arrayList.get(i2 % arrayList.size())).getNodeId())));
        }
        Assert.assertEquals(5L, resourceScheduler.getApplicationAttempt(r0.getApplicationAttemptId()).getLiveContainers().size());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(mockRM.submitApp(3072, "app", "user", (Map<ApplicationAccessType, String>) null, "b"), mockRM, (MockNM) arrayList.get(2));
        launchAndRegisterAM.allocate("*", 21504, 4, new ArrayList());
        for (int i3 = 0; i3 < 10; i3++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent((RMNode) resourceScheduler.getRMContext().getRMNodes().get(((MockNM) arrayList.get(i3 % arrayList.size())).getNodeId())));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(2L, applicationAttempt.getLiveContainers().size());
        waitNumberOfReservedContainersFromApp(applicationAttempt, 1);
        SchedulingEditPolicy schedulingEditPolicy = getSchedulingEditPolicy(mockRM);
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        int i4 = 0;
        while (applicationAttempt.getLiveContainers().size() != 4 && i4 < 10) {
            for (int i5 = 0; i5 < 10; i5++) {
                resourceScheduler.handle(new NodeUpdateSchedulerEvent((RMNode) resourceScheduler.getRMContext().getRMNodes().get(((MockNM) arrayList.get(i5 % arrayList.size())).getNodeId())));
            }
            i4++;
            Thread.sleep(100L);
        }
        Assert.assertEquals(3L, applicationAttempt.getLiveContainers().size());
        mockRM.close();
    }
}
