package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerWithMockPreemption;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.class */
public class TestFSAppStarvation extends FairSchedulerTestBase {
    private static final int NODE_CAPACITY_MULTIPLE = 4;
    private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
    private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
    private static final String[] QUEUES = {"no-preemption", "minshare", "fairshare.child", "drf.child"};

    @Before
    public void setup() {
        createConfiguration();
        this.conf.set("yarn.resourcemanager.scheduler.class", FairSchedulerWithMockPreemption.class.getCanonicalName());
        this.conf.set("yarn.scheduler.fair.allocation.file", ALLOC_FILE.getAbsolutePath());
        this.conf.setBoolean("yarn.scheduler.fair.preemption", true);
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 0.0f);
    }

    @After
    public void teardown() {
        ALLOC_FILE.delete();
        this.conf = null;
        if (this.resourceManager != null) {
            this.resourceManager.stop();
            this.resourceManager = null;
        }
    }

    @Test
    public void testPreemptionDisabled() throws Exception {
        this.conf.setBoolean("yarn.scheduler.fair.preemption", false);
        setupClusterAndSubmitJobs();
        Assert.assertNull("Found starved apps even when preemption is turned off", this.scheduler.getContext().getStarvedApps());
    }

    @Test
    public void testPreemptionEnabled() throws Exception {
        setupClusterAndSubmitJobs();
        Assert.assertNotNull("FSContext does not have an FSStarvedApps instance", this.scheduler.getContext().getStarvedApps());
        Assert.assertEquals("Expecting 3 starved applications, one each for the minshare and fairshare queues", 3L, this.preemptionThread.uniqueAppsAdded());
        this.scheduler.update();
        Thread.yield();
        verifyLeafQueueStarvation();
        Assert.assertTrue("Each app is marked as starved exactly once", this.preemptionThread.totalAppsAdded() > this.preemptionThread.uniqueAppsAdded());
    }

    @Test
    public void testClusterUtilizationThreshold() throws Exception {
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 1.1f);
        setupClusterAndSubmitJobs();
        Assert.assertNotNull("FSContext does not have an FSStarvedApps instance", this.scheduler.getContext().getStarvedApps());
        Assert.assertEquals("Found starved apps when preemption threshold is over 100%", 0L, this.preemptionThread.totalAppsAdded());
    }

    private void verifyLeafQueueStarvation() {
        for (String str : QUEUES) {
            if (!str.equals("no-preemption")) {
                Assert.assertTrue(this.scheduler.getQueueManager().getLeafQueue(str, false).isStarved());
            }
        }
    }

    private void setupClusterAndSubmitJobs() throws Exception {
        setupStarvedCluster();
        submitAppsToEachLeafQueue();
        sendEnoughNodeUpdatesToAssignFully();
        Thread.sleep(10L);
        this.scheduler.update();
        Thread.yield();
    }

    private void setupStarvedCluster() throws IOException {
        PrintWriter printWriter = new PrintWriter(new FileWriter(ALLOC_FILE));
        printWriter.println("<?xml version=\"1.0\"?>");
        printWriter.println("<allocations>");
        printWriter.println("<queue name=\"default\">");
        printWriter.println("</queue>");
        printWriter.println("<queue name=\"no-preemption\">");
        printWriter.println("<fairSharePreemptionThreshold>0</fairSharePreemptionThreshold>");
        printWriter.println("</queue>");
        printWriter.println("<queue name=\"minshare\">");
        printWriter.println("<fairSharePreemptionThreshold>0</fairSharePreemptionThreshold>");
        printWriter.println("<minSharePreemptionTimeout>0</minSharePreemptionTimeout>");
        printWriter.println("<minResources>2048mb,2vcores</minResources>");
        printWriter.println("</queue>");
        printWriter.println("<queue name=\"fairshare\">");
        printWriter.println("<fairSharePreemptionThreshold>1</fairSharePreemptionThreshold>");
        printWriter.println("<fairSharePreemptionTimeout>0</fairSharePreemptionTimeout>");
        printWriter.println("<schedulingPolicy>fair</schedulingPolicy>");
        addChildQueue(printWriter);
        printWriter.println("</queue>");
        printWriter.println("<queue name=\"drf\">");
        printWriter.println("<fairSharePreemptionThreshold>1</fairSharePreemptionThreshold>");
        printWriter.println("<fairSharePreemptionTimeout>0</fairSharePreemptionTimeout>");
        printWriter.println("<schedulingPolicy>drf</schedulingPolicy>");
        addChildQueue(printWriter);
        printWriter.println("</queue>");
        printWriter.println("</allocations>");
        printWriter.close();
        Assert.assertTrue("Allocation file does not exist, not running the test", ALLOC_FILE.exists());
        this.resourceManager = new MockRM(this.conf);
        this.resourceManager.start();
        this.scheduler = this.resourceManager.getResourceScheduler();
        this.preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread) this.scheduler.preemptionThread;
        addNode(4096, 4);
        addNode(4096, 4);
        ApplicationAttemptId createSchedulingRequest = createSchedulingRequest(1024, 1, "root.default", "default", 8);
        this.scheduler.update();
        sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals(8L, this.scheduler.getSchedulerApp(createSchedulingRequest).getLiveContainers().size());
    }

    private void addChildQueue(PrintWriter printWriter) {
        printWriter.println("<queue name=\"child\">");
        printWriter.println("<fairSharePreemptionThreshold>1</fairSharePreemptionThreshold>");
        printWriter.println("<fairSharePreemptionTimeout>0</fairSharePreemptionTimeout>");
        printWriter.println("</queue>");
    }

    private void submitAppsToEachLeafQueue() {
        for (String str : QUEUES) {
            createSchedulingRequest(1024, 1, "root." + str, "user", 1);
        }
        this.scheduler.update();
    }

    private void sendEnoughNodeUpdatesToAssignFully() {
        Iterator<RMNode> it = this.rmNodes.iterator();
        while (it.hasNext()) {
            NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = new NodeUpdateSchedulerEvent(it.next());
            for (int i = 0; i < 4; i++) {
                this.scheduler.handle(nodeUpdateSchedulerEvent);
            }
        }
    }
}
