package org.apache.flink.yarn;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.YarnTestBase;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.YarnTestUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.class */
public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);

    @BeforeClass
    public static void setup() {
        YARN_CONFIGURATION.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
        YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
        YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-capacityscheduler");
        startYARNWithConfig(YARN_CONFIGURATION);
    }

    @Test
    public void testClientStartup() throws IOException {
        Assume.assumeTrue("The new mode does not start TMs upfront.", !this.isNewMode);
        LOG.info("Starting testClientStartup()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-qu", "qa-team"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testClientStartup()");
    }

    @Test
    public void perJobYarnCluster() throws IOException {
        LOG.info("Starting perJobYarnCluster()");
        UtilsTest.addTestAppender(JobClient.class, Level.INFO);
        runWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-ys", "2", "-yjm", "768", "-ytm", "1024", YarnTestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, YarnTestBase.RunTypes.CLI_FRONTEND, 0, true);
        LOG.info("Finished perJobYarnCluster()");
    }

    @Test
    public void perJobYarnClusterOffHeap() throws IOException {
        LOG.info("Starting perJobYarnCluster()");
        UtilsTest.addTestAppender(JobClient.class, Level.INFO);
        runWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-ys", "2", "-yjm", "768", "-ytm", String.valueOf(1024L), "-yD", "taskmanager.memory.off-heap=true", "-yD", "taskmanager.memory.size=" + (((1024 - ((Integer) ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue()).intValue()) - (TaskManagerServices.calculateNetworkBufferMemory((1024 - ((Integer) ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue()).intValue()) << 20, new Configuration()) >> 20)) - 100), "-yD", "taskmanager.memory.preallocate=true", YarnTestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, YarnTestBase.RunTypes.CLI_FRONTEND, 0, true);
        LOG.info("Finished perJobYarnCluster()");
    }

    @Test(timeout = 100000)
    public void testTaskManagerFailure() throws Exception {
        String str;
        Assume.assumeTrue("The new mode does not start TMs upfront.", !this.isNewMode);
        LOG.info("Starting testTaskManagerFailure()");
        YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-s", "3", "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, "Number of connected TaskManagers changed to 1. Slots available: 3", YarnTestBase.RunTypes.YARN_SESSION);
        Assert.assertEquals(2L, getRunningContainers());
        YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(YARN_CONFIGURATION);
        createYarnClient.start();
        List applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
        Assert.assertEquals(1L, applications.size());
        ApplicationReport applicationReport = (ApplicationReport) applications.get(0);
        Assert.assertEquals("customName", applicationReport.getName());
        String trackingUrl = applicationReport.getTrackingUrl();
        if (!trackingUrl.endsWith("/")) {
            trackingUrl = trackingUrl + "/";
        }
        if (!trackingUrl.startsWith("http://")) {
            trackingUrl = "http://" + trackingUrl;
        }
        LOG.info("Got application URL from YARN {}", trackingUrl);
        Assert.assertNotNull(new ObjectMapper().readTree(TestBaseUtils.getFromHTTP(trackingUrl + "taskmanagers/")).get("taskmanagers"));
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals(3L, r0.get(0).get("slotsNumber").asInt());
        String fromHTTP = TestBaseUtils.getFromHTTP(trackingUrl + "jobmanager/config");
        Map fromKeyValueJsonArray = WebMonitorUtils.fromKeyValueJsonArray(fromHTTP);
        Assert.assertEquals("veryFancy", fromKeyValueJsonArray.get("fancy-configuration-value"));
        Assert.assertEquals("3", fromKeyValueJsonArray.get("yarn.maximum-failed-containers"));
        Assert.assertEquals("2", fromKeyValueJsonArray.get(YarnConfigOptions.VCORES.key()));
        Matcher matcher = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)").matcher(outContent.toString());
        String str2 = null;
        String str3 = null;
        while (true) {
            str = str3;
            if (!matcher.find()) {
                break;
            }
            str2 = matcher.group(1).toLowerCase();
            str3 = matcher.group(2);
        }
        LOG.info("Extracted hostname:port: {} {}", str2, str);
        Assert.assertEquals("unable to find hostname in " + fromHTTP, str2, fromKeyValueJsonArray.get(JobManagerOptions.ADDRESS.key()));
        Assert.assertEquals("unable to find port in " + fromHTTP, str, fromKeyValueJsonArray.get(JobManagerOptions.PORT.key()));
        String fromHTTP2 = TestBaseUtils.getFromHTTP(trackingUrl + "jobmanager/log");
        Assert.assertTrue(fromHTTP2.contains("Starting YARN ApplicationMaster"));
        Assert.assertTrue(fromHTTP2.contains("Starting JobManager"));
        Assert.assertTrue(fromHTTP2.contains("Starting JobManager Web Frontend"));
        ContainerId containerId = null;
        NodeManager nodeManager = null;
        UserGroupInformation userGroupInformation = null;
        TokenIdentifier tokenIdentifier = null;
        try {
            userGroupInformation = UserGroupInformation.getCurrentUser();
        } catch (IOException e) {
            LOG.warn("Unable to get curr user", e);
            Assert.fail();
        }
        for (int i = 0; i < 2; i++) {
            NodeManager nodeManager2 = yarnCluster.getNodeManager(i);
            for (Map.Entry entry : nodeManager2.getNMContext().getContainers().entrySet()) {
                if (StringUtils.join(((Container) entry.getValue()).getLaunchContext().getCommands(), " ").contains(YarnTaskManager.class.getSimpleName())) {
                    containerId = (ContainerId) entry.getKey();
                    nodeManager = nodeManager2;
                    tokenIdentifier = new NMTokenIdentifier(containerId.getApplicationAttemptId(), (NodeId) null, "", 0);
                    userGroupInformation.addTokenIdentifier(tokenIdentifier);
                }
            }
            sleep(500);
        }
        Assert.assertNotNull("Unable to find container with TaskManager", containerId);
        Assert.assertNotNull("Illegal state", nodeManager);
        createYarnClient.stop();
        LinkedList linkedList = new LinkedList();
        linkedList.add(containerId);
        try {
            nodeManager.getNMContext().getContainerManager().stopContainers(StopContainersRequest.newInstance(linkedList));
        } catch (Throwable th) {
            LOG.warn("Error stopping container", th);
            Assert.fail("Error stopping container: " + th.getMessage());
        }
        boolean z = false;
        do {
            LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
            String byteArrayOutputStream = errContent.toString();
            int indexOf = byteArrayOutputStream.indexOf("Container killed by the ApplicationMaster");
            if (indexOf != -1) {
                z = byteArrayOutputStream.substring(indexOf).indexOf("Launching TaskManager") > 0;
            }
            sleep(1000);
        } while (!z);
        startWithArgs.sendStop();
        try {
            startWithArgs.join(1000L);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted while stopping runner", e2);
        }
        LOG.warn("stopped");
        System.setOut(ORIGINAL_STDOUT);
        System.setErr(ORIGINAL_STDERR);
        String byteArrayOutputStream2 = outContent.toString();
        String byteArrayOutputStream3 = errContent.toString();
        LOG.info("Sending stdout content through logger: \n\n{}\n\n", byteArrayOutputStream2);
        LOG.info("Sending stderr content through logger: \n\n{}\n\n", byteArrayOutputStream3);
        Assert.assertTrue("Expect to see failed container", byteArrayOutputStream3.contains("New messages from the YARN cluster"));
        Assert.assertTrue("Expect to see failed container", byteArrayOutputStream3.contains("Container killed by the ApplicationMaster"));
        Assert.assertTrue("Expect to see new container started", byteArrayOutputStream3.contains("Launching TaskManager") && byteArrayOutputStream3.contains("on host"));
        userGroupInformation.getTokenIdentifiers().remove(tokenIdentifier);
        LOG.info("Finished testTaskManagerFailure()");
    }

    @Test
    public void testNonexistingQueueWARNmessage() throws IOException {
        LOG.info("Starting testNonexistingQueueWARNmessage()");
        UtilsTest.addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
        try {
            runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, YarnTestBase.RunTypes.YARN_SESSION, 1);
        } catch (Exception e) {
            TestCase.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "to unknown queue: doesntExist").isPresent());
        }
        UtilsTest.checkForLogString("The specified queue 'doesntExist' does not exist. Available queues");
        LOG.info("Finished testNonexistingQueueWARNmessage()");
    }

    @Test
    public void perJobYarnClusterWithParallelism() throws IOException {
        LOG.info("Starting perJobYarnClusterWithParallelism()");
        UtilsTest.addTestAppender(JobClient.class, Level.INFO);
        runWithArgs(new String[]{"run", "-p", "2", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-ys", "2", "-yjm", "768", "-ytm", "1024", YarnTestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, YarnTestBase.RunTypes.CLI_FRONTEND, 0, true);
        LOG.info("Finished perJobYarnClusterWithParallelism()");
    }

    @Test(timeout = 60000)
    public void testDetachedPerJobYarnCluster() throws Exception {
        LOG.info("Starting testDetachedPerJobYarnCluster()");
        testDetachedPerJobYarnClusterInternal(YarnTestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath());
        LOG.info("Finished testDetachedPerJobYarnCluster()");
    }

    @Test(timeout = 60000)
    public void testDetachedPerJobYarnClusterWithStreamingJob() throws Exception {
        LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
        testDetachedPerJobYarnClusterInternal(YarnTestUtils.getTestJarPath("StreamingWordCount.jar").getAbsolutePath());
        LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
    }

    private void testDetachedPerJobYarnClusterInternal(String str) throws Exception {
        ApplicationId applicationId;
        ApplicationReport applicationReport;
        YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(YARN_CONFIGURATION);
        createYarnClient.start();
        try {
            File newFolder = tmp.newFolder();
            try {
                File newFile = tmp.newFile();
                FileUtils.writeStringToFile(newFile, "Goethe - Faust: Der Tragoedie erster Teil\nProlog im Himmel.\nDer Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\nErzengel treten vor.\nRAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\nUnd ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\ngibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\nhohen Werke Sind herrlich wie am ersten Tag.\nGABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\nPracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\nschaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\nFels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\nMICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\naufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\nDa flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\ndeine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\nZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden\nmag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.\nMEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie\nalles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So\nsiehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte\nmachen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte\ndich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von\nSonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die\nMenschen plagen. Der kleine Gott der Welt bleibt stets von gleichem\nSchlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser\nwuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;\nEr nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier\nzu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der\nlangbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im\nGras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In\njeden Quark begraebt er seine Nase.\nDER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer\nanzuklagen? Ist auf der Erde ewig dir nichts recht?\nMEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich\nschlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar\ndie armen selbst nicht plagen.\nDER HERR: Kennst du den Faust?\nMEPHISTOPHELES: Den Doktor?\nDER HERR: Meinen Knecht!\nMEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch\nist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er\nist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten\nSterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne\nBefriedigt nicht die tiefbewegte Brust.\nDER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in\ndie Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das\nBluet und Frucht die kuenft'gen Jahre zieren.\nMEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr\nmir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.\nDER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,\nEs irrt der Mensch so lang er strebt.\nMEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals\ngern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer\neinem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.\nDER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem\nUrquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit\nherab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in\nseinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.\nMEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine\nWette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir\nTriumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine\nMuhme, die beruehmte Schlange.\nDER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen\nnie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am\nwenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,\ner liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen\nzu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten\nGoettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das\newig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was\nin schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!\n(Der Himmel schliesst, die Erzengel verteilen sich.)\nMEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und\nhuete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,\nSo menschlich mit dem Teufel selbst zu sprechen.");
                YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-yjm", "768", "-yD", "yarn.heap-cutoff-ratio=0.7", "-yD", "yarn.tags=test-tag", "-ytm", "1024", "-ys", "2", "-p", "2", "--detached", str, "--input", newFile.getAbsoluteFile().toString(), "--output", newFolder.getAbsoluteFile().toString()}, "Job has been submitted with JobID", YarnTestBase.RunTypes.CLI_FRONTEND);
                Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
                for (int i = 0; startWithArgs.isAlive() && i < 5; i++) {
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e) {
                    }
                }
                Assert.assertFalse("The runner should detach.", startWithArgs.isAlive());
                LOG.info("CLI Frontend has returned, so the job is running");
                try {
                    List applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
                    if (applications.size() == 1) {
                        applicationId = ((ApplicationReport) applications.get(0)).getApplicationId();
                        LOG.info("waiting for the job with appId {} to finish", applicationId);
                        while (createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
                            sleep(500);
                        }
                    } else {
                        List applications2 = createYarnClient.getApplications();
                        Collections.sort(applications2, new Comparator<ApplicationReport>() { // from class: org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.1
                            @Override // java.util.Comparator
                            public int compare(ApplicationReport applicationReport2, ApplicationReport applicationReport3) {
                                return applicationReport2.getApplicationId().compareTo(applicationReport3.getApplicationId()) * (-1);
                            }
                        });
                        applicationId = ((ApplicationReport) applications2.get(0)).getApplicationId();
                        LOG.info("Selected {} as the last appId from {}", applicationId, Arrays.toString(applications2.toArray()));
                    }
                    final ApplicationId applicationId2 = applicationId;
                    File[] listFiles = newFolder.listFiles();
                    Assert.assertNotNull("Taskmanager output not found", listFiles);
                    LOG.info("The job has finished. TaskManager output files found in {}", newFolder);
                    String str2 = "";
                    for (File file : listFiles) {
                        if (file.isFile()) {
                            str2 = str2 + FileUtils.readFileToString(file) + "\n";
                        }
                    }
                    Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '" + str2 + "'", str2.contains("da 5") || str2.contains("(da,5)") || str2.contains("(all,2)"));
                    Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'" + str2 + "'", str2.contains("der 29") || str2.contains("(der,29)") || str2.contains("(mind,1)"));
                    File findFile = YarnTestBase.findFile("..", new FilenameFilter() { // from class: org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.2
                        @Override // java.io.FilenameFilter
                        public boolean accept(File file2, String str3) {
                            return str3.contains("jobmanager.log") && file2.getAbsolutePath().contains(applicationId2.toString());
                        }
                    });
                    Assert.assertNotNull("Unable to locate JobManager log", findFile);
                    String readFileToString = FileUtils.readFileToString(findFile);
                    Assert.assertTrue("Expected string 'Starting TaskManagers' not found in JobManager log: '" + findFile + "'", readFileToString.contains("Starting TaskManagers"));
                    Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log.This string checks that the job has been started with a parallelism of 2. Log contents: '" + findFile + "'", readFileToString.contains(" (2/2) (attempt #0) to "));
                    LOG.info("Checking again that app has finished");
                    do {
                        sleep(500);
                        applicationReport = createYarnClient.getApplicationReport(applicationId2);
                        LOG.info("Got report {}", applicationReport);
                    } while (applicationReport.getYarnApplicationState() == YarnApplicationState.RUNNING);
                    verifyApplicationTags(applicationReport);
                    File file2 = new File(System.getenv("FLINK_CONF_DIR"));
                    LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file2.getAbsolutePath());
                    LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                    try {
                        File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file2.getAbsolutePath()).getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                        if (yarnPropertiesLocation.exists()) {
                            LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation.getAbsolutePath());
                            yarnPropertiesLocation.delete();
                        }
                    } catch (Exception e2) {
                        LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e2);
                    }
                    try {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
                        createYarnClient.stop();
                    } catch (Exception e3) {
                        LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while close the yarn client", e3);
                    }
                } catch (Throwable th) {
                    File file3 = new File(System.getenv("FLINK_CONF_DIR"));
                    LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file3.getAbsolutePath());
                    LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                    try {
                        File yarnPropertiesLocation2 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file3.getAbsolutePath()).getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                        if (yarnPropertiesLocation2.exists()) {
                            LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation2.getAbsolutePath());
                            yarnPropertiesLocation2.delete();
                        }
                    } catch (Exception e4) {
                        LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e4);
                    }
                    try {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
                        createYarnClient.stop();
                    } catch (Exception e5) {
                        LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while close the yarn client", e5);
                    }
                    throw th;
                }
            } catch (IOException e6) {
                throw new RuntimeException(e6);
            }
        } catch (IOException e7) {
            throw new RuntimeException(e7);
        }
    }

    private void verifyApplicationTags(ApplicationReport applicationReport) throws InvocationTargetException, IllegalAccessException {
        try {
            Assert.assertEquals(Collections.singleton("test-tag"), (Set) ApplicationReport.class.getMethod("getApplicationTags", new Class[0]).invoke(applicationReport, new Object[0]));
        } catch (NoSuchMethodException e) {
        }
    }

    @After
    public void checkForProhibitedLogContents() {
        ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
    }
}
