package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.yarn.YarnTestBase;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.YarnTestUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YARNSessionFIFOITCase.class */
public class YARNSessionFIFOITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);

    @BeforeClass
    public static void setup() {
        YARN_CONFIGURATION.setClass("yarn.resourcemanager.scheduler.class", FifoScheduler.class, ResourceScheduler.class);
        YARN_CONFIGURATION.setInt("yarn.nodemanager.resource.memory-mb", 768);
        YARN_CONFIGURATION.setInt("yarn.scheduler.minimum-allocation-mb", 512);
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-fifo");
        startYARNWithConfig(YARN_CONFIGURATION);
    }

    @After
    public void checkForProhibitedLogContents() {
        ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
    }

    @Test(timeout = 60000)
    public void testDetachedMode() throws InterruptedException, IOException {
        LOG.info("Starting testDetachedMode()");
        UtilsTest.addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
        File testJarPath = YarnTestUtils.getTestJarPath("StreamingWordCount.jar");
        File newFile = tmp.newFile();
        FileUtils.writeStringToFile(newFile, "Goethe - Faust: Der Tragoedie erster Teil\nProlog im Himmel.\nDer Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\nErzengel treten vor.\nRAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\nUnd ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\ngibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\nhohen Werke Sind herrlich wie am ersten Tag.\nGABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\nPracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\nschaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\nFels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\nMICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\naufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\nDa flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\ndeine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\nZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden\nmag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.\nMEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie\nalles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So\nsiehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte\nmachen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte\ndich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von\nSonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die\nMenschen plagen. Der kleine Gott der Welt bleibt stets von gleichem\nSchlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser\nwuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;\nEr nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier\nzu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der\nlangbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im\nGras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In\njeden Quark begraebt er seine Nase.\nDER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer\nanzuklagen? Ist auf der Erde ewig dir nichts recht?\nMEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich\nschlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar\ndie armen selbst nicht plagen.\nDER HERR: Kennst du den Faust?\nMEPHISTOPHELES: Den Doktor?\nDER HERR: Meinen Knecht!\nMEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch\nist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er\nist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten\nSterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne\nBefriedigt nicht die tiefbewegte Brust.\nDER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in\ndie Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das\nBluet und Frucht die kuenft'gen Jahre zieren.\nMEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr\nmir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.\nDER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,\nEs irrt der Mensch so lang er strebt.\nMEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals\ngern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer\neinem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.\nDER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem\nUrquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit\nherab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in\nseinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.\nMEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine\nWette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir\nTriumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine\nMuhme, die beruehmte Schlange.\nDER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen\nnie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am\nwenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,\ner liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen\nzu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten\nGoettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das\newig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was\nin schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!\n(Der Himmel schliesst, die Erzengel verteilen sich.)\nMEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und\nhuete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,\nSo menschlich mit dem Teufel selbst zu sprechen.");
        ArrayList arrayList = new ArrayList();
        arrayList.add("-j");
        arrayList.add(flinkUberjar.getAbsolutePath());
        arrayList.add("-t");
        arrayList.add(flinkLibFolder.getAbsolutePath());
        arrayList.add("-n");
        arrayList.add("1");
        arrayList.add("-jm");
        arrayList.add("768");
        arrayList.add("-tm");
        arrayList.add("1024");
        if (SecureTestEnvironment.getTestKeytab() != null) {
            arrayList.add("-D" + SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + "=" + SecureTestEnvironment.getTestKeytab());
        }
        if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
            arrayList.add("-D" + SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + "=" + SecureTestEnvironment.getHadoopServicePrincipal());
        }
        arrayList.add("--name");
        arrayList.add("MyCustomName");
        arrayList.add("--detached");
        startWithArgs((String[]) arrayList.toArray(new String[arrayList.size()]), "Flink JobManager is now running on", YarnTestBase.RunTypes.YARN_SESSION).join();
        if (!this.isNewMode) {
            UtilsTest.checkForLogString("The Flink YARN client has been started in detached mode");
            LOG.info("Waiting until two containers are running");
            while (getRunningContainers() < 2) {
                sleep(500);
            }
            long nanoTime = System.nanoTime();
            while (System.nanoTime() - nanoTime < TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS) && (!verifyStringsInNamedLogFiles(new String[]{"YARN Application Master started"}, "jobmanager.log") || !verifyStringsInNamedLogFiles(new String[]{"Starting TaskManager actor"}, "taskmanager.log"))) {
                LOG.info("Still waiting for JM/TM to initialize...");
                sleep(500);
            }
        }
        startWithArgs(new String[]{"run", "--detached", testJarPath.getAbsolutePath(), "--input", newFile.getAbsoluteFile().toString()}, "Job has been submitted with JobID", YarnTestBase.RunTypes.CLI_FRONTEND).join();
        if (this.isNewMode) {
            LOG.info("Waiting until two containers are running");
            while (getRunningContainers() < 2) {
                sleep(500);
            }
        }
        long nanoTime2 = System.nanoTime();
        while (System.nanoTime() - nanoTime2 < TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS) && !verifyStringsInNamedLogFiles(new String[]{"switched from state RUNNING to FINISHED"}, "jobmanager.log")) {
            LOG.info("Still waiting for cluster to finish job...");
            sleep(500);
        }
        LOG.info("Two containers are running. Killing the application");
        try {
            try {
                YarnClient createYarnClient = YarnClient.createYarnClient();
                createYarnClient.init(YARN_CONFIGURATION);
                createYarnClient.start();
                List applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
                Assert.assertEquals(1L, applications.size());
                ApplicationReport applicationReport = (ApplicationReport) applications.get(0);
                Assert.assertEquals("MyCustomName", applicationReport.getName());
                createYarnClient.killApplication(applicationReport.getApplicationId());
                while (createYarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0 && createYarnClient.getApplications(EnumSet.of(YarnApplicationState.FINISHED)).size() == 0) {
                    sleep(500);
                }
                File file = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                try {
                    File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file.getAbsolutePath()).getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                    if (yarnPropertiesLocation.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation.getAbsolutePath());
                        yarnPropertiesLocation.delete();
                    }
                } catch (Exception e) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
                }
            } catch (Throwable th) {
                LOG.warn("Killing failed", th);
                Assert.fail();
                File file2 = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file2.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                try {
                    File yarnPropertiesLocation2 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file2.getAbsolutePath()).getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                    if (yarnPropertiesLocation2.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation2.getAbsolutePath());
                        yarnPropertiesLocation2.delete();
                    }
                } catch (Exception e2) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e2);
                }
            }
            LOG.info("Finished testDetachedMode()");
        } catch (Throwable th2) {
            File file3 = new File(System.getenv("FLINK_CONF_DIR"));
            LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file3.getAbsolutePath());
            LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
            try {
                File yarnPropertiesLocation3 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file3.getAbsolutePath()).getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                if (yarnPropertiesLocation3.exists()) {
                    LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation3.getAbsolutePath());
                    yarnPropertiesLocation3.delete();
                }
            } catch (Exception e3) {
                LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e3);
            }
            throw th2;
        }
    }

    @Test
    public void testQueryCluster() throws IOException {
        LOG.info("Starting testQueryCluster()");
        runWithArgs(new String[]{"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testQueryCluster()");
    }

    @Test
    @Ignore("The test is too resource consuming (8.5 GB of memory)")
    public void testResourceComputation() throws IOException {
        UtilsTest.addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
        LOG.info("Starting testResourceComputation()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "5", "-jm", "256", "-tm", "1585"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testResourceComputation()");
        UtilsTest.checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
    }

    @Test
    @Ignore("The test is too resource consuming (8 GB of memory)")
    public void testfullAlloc() throws IOException {
        UtilsTest.addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
        LOG.info("Starting testfullAlloc()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "2", "-jm", "256", "-tm", "3840"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testfullAlloc()");
        UtilsTest.checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\nAfter allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
    }

    @Test
    public void testJavaAPI() throws Exception {
        LOG.info("Starting testJavaAPI()");
        LegacyYarnClusterDescriptor legacyYarnClusterDescriptor = new LegacyYarnClusterDescriptor(GlobalConfiguration.loadConfiguration(), getYarnConfiguration(), System.getenv("FLINK_CONF_DIR"), getYarnClient(), true);
        Throwable th = null;
        try {
            try {
                Assert.assertNotNull("unable to get yarn client", legacyYarnClusterDescriptor);
                legacyYarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
                legacyYarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
                ClusterClient clusterClient = null;
                try {
                    clusterClient = legacyYarnClusterDescriptor.deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(768).setTaskManagerMemoryMB(1024).setNumberTaskManagers(1).setSlotsPerTaskManager(1).createClusterSpecification());
                } catch (Exception e) {
                    LOG.warn("Failing test", e);
                    Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
                }
                GetClusterStatusResponse getClusterStatusResponse = new GetClusterStatusResponse(1, 1);
                int i = 0;
                while (true) {
                    if (i >= 30) {
                        break;
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                        LOG.warn("Interrupted", e2);
                    }
                    GetClusterStatusResponse clusterStatus = clusterClient.getClusterStatus();
                    if (clusterStatus != null && clusterStatus.equals(getClusterStatusResponse)) {
                        LOG.info("ClusterClient reached status " + clusterStatus);
                        break;
                    } else {
                        if (i > 15) {
                            Assert.fail("The custer didn't start after 15 seconds");
                        }
                        i++;
                    }
                }
                Assert.assertNotNull(clusterClient.getClusterConnectionInfo());
                Assert.assertNotNull(clusterClient.getWebInterfaceURL());
                LOG.info("Shutting down cluster. All tests passed");
                clusterClient.shutdown();
                if (legacyYarnClusterDescriptor != null) {
                    if (0 != 0) {
                        try {
                            legacyYarnClusterDescriptor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        legacyYarnClusterDescriptor.close();
                    }
                }
                LOG.info("Finished testJavaAPI()");
            } finally {
            }
        } catch (Throwable th3) {
            if (legacyYarnClusterDescriptor != null) {
                if (th != null) {
                    try {
                        legacyYarnClusterDescriptor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    legacyYarnClusterDescriptor.close();
                }
            }
            throw th3;
        }
    }
}
