package org.apache.flink.yarn;

import java.io.File;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.yarn.YarnTestBase;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YARNSessionFIFOITCase.class */
public class YARNSessionFIFOITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);

    @BeforeClass
    public static void setup() {
        YARN_CONFIGURATION.setClass("yarn.resourcemanager.scheduler.class", FifoScheduler.class, ResourceScheduler.class);
        YARN_CONFIGURATION.setInt("yarn.nodemanager.resource.memory-mb", 768);
        YARN_CONFIGURATION.setInt("yarn.scheduler.minimum-allocation-mb", 512);
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-fifo");
        startYARNWithConfig(YARN_CONFIGURATION);
    }

    @After
    public void checkForProhibitedLogContents() {
        ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeout = 60000)
    public void testDetachedMode() throws InterruptedException {
        LOG.info("Starting testDetachedMode()");
        UtilsTest.addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
        startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "--name", "MyCustomName", "--detached"}, "Flink JobManager is now running on", YarnTestBase.RunTypes.YARN_SESSION).join();
        UtilsTest.checkForLogString("The Flink YARN client has been started in detached mode");
        LOG.info("Waiting until two containers are running");
        while (getRunningContainers() < 2) {
            sleep(500);
        }
        sleep(2000);
        LOG.info("Two containers are running. Killing the application");
        try {
            try {
                YarnClient createYarnClient = YarnClient.createYarnClient();
                createYarnClient.init(YARN_CONFIGURATION);
                createYarnClient.start();
                List applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
                Assert.assertEquals(1L, applications.size());
                ApplicationReport applicationReport = (ApplicationReport) applications.get(0);
                Assert.assertEquals("MyCustomName", applicationReport.getName());
                createYarnClient.killApplication(applicationReport.getApplicationId());
                while (createYarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
                    sleep(500);
                }
                File file = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                GlobalConfiguration.loadConfiguration(file.getAbsolutePath());
                try {
                    File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
                    if (yarnPropertiesLocation.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation.getAbsolutePath());
                        yarnPropertiesLocation.delete();
                    }
                } catch (Exception e) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
                }
            } catch (Throwable th) {
                LOG.warn("Killing failed", th);
                Assert.fail();
                File file2 = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file2.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                GlobalConfiguration.loadConfiguration(file2.getAbsolutePath());
                try {
                    File yarnPropertiesLocation2 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
                    if (yarnPropertiesLocation2.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation2.getAbsolutePath());
                        yarnPropertiesLocation2.delete();
                    }
                } catch (Exception e2) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e2);
                }
            }
            LOG.info("Finished testDetachedMode()");
        } catch (Throwable th2) {
            File file3 = new File(System.getenv("FLINK_CONF_DIR"));
            LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file3.getAbsolutePath());
            LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
            GlobalConfiguration.loadConfiguration(file3.getAbsolutePath());
            try {
                File yarnPropertiesLocation3 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
                if (yarnPropertiesLocation3.exists()) {
                    LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation3.getAbsolutePath());
                    yarnPropertiesLocation3.delete();
                }
            } catch (Exception e3) {
                LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e3);
            }
            throw th2;
        }
    }

    @Test
    public void testQueryCluster() {
        LOG.info("Starting testQueryCluster()");
        runWithArgs(new String[]{"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testQueryCluster()");
    }

    @Test
    @Ignore("The test is too resource consuming (8.5 GB of memory)")
    public void testResourceComputation() {
        UtilsTest.addTestAppender(YarnClusterDescriptor.class, Level.WARN);
        LOG.info("Starting testResourceComputation()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "5", "-jm", "256", "-tm", "1585"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testResourceComputation()");
        UtilsTest.checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
    }

    @Test
    @Ignore("The test is too resource consuming (8 GB of memory)")
    public void testfullAlloc() {
        UtilsTest.addTestAppender(YarnClusterDescriptor.class, Level.WARN);
        LOG.info("Starting testfullAlloc()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "2", "-jm", "256", "-tm", "3840"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testfullAlloc()");
        UtilsTest.checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\nAfter allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
    }

    @Test
    public void testJavaAPI() throws Exception {
        LOG.info("Starting testJavaAPI()");
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(GlobalConfiguration.loadConfiguration(), System.getenv("FLINK_CONF_DIR"));
        Assert.assertNotNull("unable to get yarn client", yarnClusterDescriptor);
        yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
        yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
        YarnClusterClient yarnClusterClient = null;
        try {
            yarnClusterClient = yarnClusterDescriptor.deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(768).setTaskManagerMemoryMB(1024).setNumberTaskManagers(1).setSlotsPerTaskManager(1).createClusterSpecification());
        } catch (Exception e) {
            LOG.warn("Failing test", e);
            Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
        }
        GetClusterStatusResponse getClusterStatusResponse = new GetClusterStatusResponse(1, 1);
        int i = 0;
        while (true) {
            if (i >= 30) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted", e2);
            }
            GetClusterStatusResponse clusterStatus = yarnClusterClient.getClusterStatus();
            if (clusterStatus != null && clusterStatus.equals(getClusterStatusResponse)) {
                LOG.info("ClusterClient reached status " + clusterStatus);
                break;
            } else {
                if (i > 15) {
                    Assert.fail("The custer didn't start after 15 seconds");
                }
                i++;
            }
        }
        Assert.assertNotNull(yarnClusterClient.getJobManagerAddress());
        Assert.assertNotNull(yarnClusterClient.getWebInterfaceURL());
        LOG.info("Shutting down cluster. All tests passed");
        yarnClusterClient.shutdown();
        LOG.info("Finished testJavaAPI()");
    }
}
