package org.apache.flink.yarn;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.YarnTestBase;
import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.codehaus.jettison.json.JSONObject;
import org.jets3t.service.security.EncryptionUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YARNSessionFIFOITCase.class */
public class YARNSessionFIFOITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);

    @BeforeClass
    public static void setup() {
        yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
        yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
        yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
        yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
        startYARNWithConfig(yarnConfiguration);
    }

    @After
    public void checkForProhibitedLogContents() {
        ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS);
    }

    @Test
    public void testClientStartup() {
        LOG.info("Starting testClientStartup()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-s", EncryptionUtil.DEFAULT_VERSION}, "Number of connected TaskManagers changed to 1. Slots available: 2", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testClientStartup()");
    }

    @Test(timeout = 60000)
    public void testDetachedMode() {
        LOG.info("Starting testDetachedMode()");
        UtilsTest.addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
        YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "--detached"}, "Flink JobManager is now running on", YarnTestBase.RunTypes.YARN_SESSION);
        UtilsTest.checkForLogString("The Flink YARN client has been started in detached mode");
        Assert.assertFalse("The runner should detach.", startWithArgs.isAlive());
        LOG.info("Waiting until two containers are running");
        while (getRunningContainers() < 2) {
            sleep(500);
        }
        LOG.info("Two containers are running. Killing the application");
        try {
            YarnClient createYarnClient = YarnClient.createYarnClient();
            createYarnClient.init(yarnConfiguration);
            createYarnClient.start();
            List<ApplicationReport> applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
            Assert.assertEquals(1L, applications.size());
            createYarnClient.killApplication(applications.get(0).getApplicationId());
            while (createYarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
                sleep(500);
            }
        } catch (Throwable th) {
            LOG.warn("Killing failed", th);
            Assert.fail();
        }
        LOG.info("Finished testDetachedMode()");
    }

    @Test(timeout = 100000)
    public void testTaskManagerFailure() {
        LOG.info("Starting testTaskManagerFailure()");
        YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3"}, "Number of connected TaskManagers changed to 1. Slots available: 1", YarnTestBase.RunTypes.YARN_SESSION);
        Assert.assertEquals(2L, getRunningContainers());
        try {
            YarnClient createYarnClient = YarnClient.createYarnClient();
            createYarnClient.init(yarnConfiguration);
            createYarnClient.start();
            List<ApplicationReport> applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
            Assert.assertEquals(1L, applications.size());
            String trackingUrl = applications.get(0).getTrackingUrl();
            if (!trackingUrl.endsWith("/")) {
                trackingUrl = trackingUrl + "/";
            }
            if (!trackingUrl.startsWith("http://")) {
                trackingUrl = "http://" + trackingUrl;
            }
            LOG.info("Got application URL from YARN {}", trackingUrl);
            Assert.assertEquals("{\"taskmanagers\": 1, \"slots\": 1}", TestBaseUtils.getFromHTTP(trackingUrl + "jobsInfo?get=taskmanagers"));
            JSONObject jSONObject = new JSONObject(TestBaseUtils.getFromHTTP(trackingUrl + "setupInfo?get=globalC"));
            Assert.assertEquals("veryFancy", jSONObject.getString("fancy-configuration-value"));
            Assert.assertEquals("3", jSONObject.getString(ConfigConstants.YARN_MAX_FAILED_CONTAINERS));
            Matcher matcher = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)").matcher(outContent.toString());
            String str = null;
            String str2 = null;
            while (matcher.find()) {
                str = matcher.group(1).toLowerCase();
                str2 = matcher.group(2);
            }
            LOG.info("Extracted hostname:port: {} {}", str, str2);
            Assert.assertEquals("unable to find hostname in " + jSONObject, str, jSONObject.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY).toLowerCase());
            Assert.assertEquals("unable to find port in " + jSONObject, str2, jSONObject.getString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
            Assert.assertTrue(TestBaseUtils.getFromHTTP(trackingUrl + "logInfo").contains("Starting YARN ApplicationMaster/JobManager (Version"));
        } catch (Throwable th) {
            LOG.warn("Error while running test", th);
            Assert.fail(th.getMessage());
        }
        ContainerId containerId = null;
        NodeManager nodeManager = null;
        UserGroupInformation userGroupInformation = null;
        NMTokenIdentifier nMTokenIdentifier = null;
        try {
            userGroupInformation = UserGroupInformation.getCurrentUser();
        } catch (IOException e) {
            LOG.warn("Unable to get curr user", (Throwable) e);
            Assert.fail();
        }
        for (int i = 0; i < 2; i++) {
            NodeManager nodeManager2 = yarnCluster.getNodeManager(i);
            for (Map.Entry<ContainerId, Container> entry : nodeManager2.getNMContext().getContainers().entrySet()) {
                if (Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()).contains(YarnTaskManagerRunner.class.getSimpleName())) {
                    containerId = entry.getKey();
                    nodeManager = nodeManager2;
                    nMTokenIdentifier = new NMTokenIdentifier(containerId.getApplicationAttemptId(), null, "", 0);
                    userGroupInformation.addTokenIdentifier(nMTokenIdentifier);
                }
            }
            sleep(500);
        }
        Assert.assertNotNull("Unable to find container with TaskManager", containerId);
        Assert.assertNotNull("Illegal state", nodeManager);
        LinkedList linkedList = new LinkedList();
        linkedList.add(containerId);
        try {
            nodeManager.getNMContext().getContainerManager().stopContainers(StopContainersRequest.newInstance(linkedList));
        } catch (Throwable th2) {
            LOG.warn("Error stopping container", th2);
            Assert.fail("Error stopping container: " + th2.getMessage());
        }
        boolean z = false;
        do {
            LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
            String byteArrayOutputStream = errContent.toString();
            int indexOf = byteArrayOutputStream.indexOf("Container killed by the ApplicationMaster");
            if (indexOf != -1) {
                z = byteArrayOutputStream.substring(indexOf).indexOf("Launching container") > 0;
            }
            sleep(1000);
        } while (!z);
        startWithArgs.sendStop();
        try {
            startWithArgs.join(1000L);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted while stopping runner", (Throwable) e2);
        }
        LOG.warn("stopped");
        System.setOut(originalStdout);
        System.setErr(originalStderr);
        String byteArrayOutputStream2 = outContent.toString();
        String byteArrayOutputStream3 = errContent.toString();
        LOG.info("Sending stdout content through logger: \n\n{}\n\n", byteArrayOutputStream2);
        LOG.info("Sending stderr content through logger: \n\n{}\n\n", byteArrayOutputStream3);
        Assert.assertTrue("Expect to see failed container", byteArrayOutputStream3.contains("New messages from the YARN cluster"));
        Assert.assertTrue("Expect to see failed container", byteArrayOutputStream3.contains("Container killed by the ApplicationMaster"));
        Assert.assertTrue("Expect to see new container started", byteArrayOutputStream3.contains("Launching container") && byteArrayOutputStream3.contains("on host"));
        userGroupInformation.getTokenIdentifiers().remove(nMTokenIdentifier);
        LOG.info("Finished testTaskManagerFailure()");
    }

    @Test
    public void testQueryCluster() {
        LOG.info("Starting testQueryCluster()");
        runWithArgs(new String[]{"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testQueryCluster()");
    }

    @Test
    public void testNonexistingQueue() {
        LOG.info("Starting testNonexistingQueue()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", "-jm", "768", "-tm", "1024", "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testNonexistingQueue()");
    }

    @Test
    @Ignore("The test is too resource consuming (8.5 GB of memory)")
    public void testResourceComputation() {
        UtilsTest.addTestAppender(FlinkYarnClient.class, Level.WARN);
        LOG.info("Starting testResourceComputation()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "5", "-jm", "256", "-tm", "1585"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testResourceComputation()");
        UtilsTest.checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
    }

    @Test
    @Ignore("The test is too resource consuming (8 GB of memory)")
    public void testfullAlloc() {
        UtilsTest.addTestAppender(FlinkYarnClient.class, Level.WARN);
        LOG.info("Starting testfullAlloc()");
        runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", EncryptionUtil.DEFAULT_VERSION, "-jm", "256", "-tm", "3840"}, "Number of connected TaskManagers changed to", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        LOG.info("Finished testfullAlloc()");
        UtilsTest.checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\nAfter allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
    }

    @Test
    public void perJobYarnCluster() {
        LOG.info("Starting perJobYarnCluster()");
        File findFile = YarnTestBase.findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new YarnTestBase.ContainsName(new String[]{"-WordCount.jar"}, "streaming"));
        Assert.assertNotNull("Could not find wordcount jar", findFile);
        runWithArgs(new String[]{"run", "-m", CliFrontend.YARN_DEPLOY_JOBMANAGER, "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", "-ys", EncryptionUtil.DEFAULT_VERSION, "-yjm", "768", "-ytm", "1024", findFile.getAbsolutePath()}, "Job execution switched to status FINISHED.", new String[]{"System.out)(1/1) switched to FINISHED "}, YarnTestBase.RunTypes.CLI_FRONTEND, 0);
        LOG.info("Finished perJobYarnCluster()");
    }

    @Test
    public void perJobYarnClusterWithParallelism() {
        LOG.info("Starting perJobYarnClusterWithParallelism()");
        File findFile = YarnTestBase.findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new YarnTestBase.ContainsName(new String[]{"-WordCount.jar"}, "streaming"));
        Assert.assertNotNull("Could not find wordcount jar", findFile);
        runWithArgs(new String[]{"run", "-p", EncryptionUtil.DEFAULT_VERSION, "-m", CliFrontend.YARN_DEPLOY_JOBMANAGER, "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", "-yjm", "768", "-ytm", "1024", findFile.getAbsolutePath()}, "Job execution switched to status FINISHED.", new String[]{"System.out)(1/1) switched to FINISHED "}, YarnTestBase.RunTypes.CLI_FRONTEND, 0);
        LOG.info("Finished perJobYarnClusterWithParallelism()");
    }

    private void testDetachedPerJobYarnClusterInternal(String str) {
        ApplicationId applicationId;
        ApplicationReport applicationReport;
        YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(yarnConfiguration);
        createYarnClient.start();
        try {
            File newFolder = tmp.newFolder();
            try {
                File newFile = tmp.newFile();
                FileUtils.writeStringToFile(newFile, WordCountData.TEXT);
                YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"run", "-m", CliFrontend.YARN_DEPLOY_JOBMANAGER, "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", "-yjm", "768", "-yD", "yarn.heap-cutoff-ratio=0.5", "-ytm", "1024", "-ys", EncryptionUtil.DEFAULT_VERSION, "--yarndetached", str, newFile.getAbsoluteFile().toString(), newFolder.getAbsoluteFile().toString()}, "The Job has been submitted with JobID", YarnTestBase.RunTypes.CLI_FRONTEND);
                Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
                Assert.assertFalse("The runner should detach.", startWithArgs.isAlive());
                LOG.info("CLI Frontend has returned, so the job is running");
                try {
                    List<ApplicationReport> applications = createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
                    if (applications.size() == 1) {
                        applicationId = applications.get(0).getApplicationId();
                        LOG.info("waiting for the job with appId {} to finish", applicationId);
                        while (createYarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
                            sleep(500);
                        }
                    } else {
                        List<ApplicationReport> applications2 = createYarnClient.getApplications();
                        Collections.sort(applications2, new Comparator<ApplicationReport>() { // from class: org.apache.flink.yarn.YARNSessionFIFOITCase.1
                            @Override // java.util.Comparator
                            public int compare(ApplicationReport applicationReport2, ApplicationReport applicationReport3) {
                                return applicationReport2.getApplicationId().compareTo(applicationReport3.getApplicationId()) * (-1);
                            }
                        });
                        applicationId = applications2.get(0).getApplicationId();
                        LOG.info("Selected {} as the last appId from {}", applicationId, Arrays.toString(applications2.toArray()));
                    }
                    final ApplicationId applicationId2 = applicationId;
                    File[] listFiles = newFolder.listFiles();
                    Assert.assertNotNull("Taskmanager output not found", listFiles);
                    LOG.info("The job has finished. TaskManager output files found in {}", newFolder);
                    String str2 = "";
                    for (File file : listFiles) {
                        if (file.isFile()) {
                            str2 = str2 + FileUtils.readFileToString(file) + "\n";
                        }
                    }
                    Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '" + str2 + "'", str2.contains("da 5") || str2.contains("(da,5)") || str2.contains("(all,2)"));
                    Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'" + str2 + "'", str2.contains("der 29") || str2.contains("(der,29)") || str2.contains("(mind,1)"));
                    File findFile = YarnTestBase.findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new FilenameFilter() { // from class: org.apache.flink.yarn.YARNSessionFIFOITCase.2
                        @Override // java.io.FilenameFilter
                        public boolean accept(File file2, String str3) {
                            return str3.contains("jobmanager-main") && file2.getAbsolutePath().contains(applicationId2.toString());
                        }
                    });
                    Assert.assertNotNull("Unable to locate JobManager log", findFile);
                    String readFileToString = FileUtils.readFileToString(findFile);
                    Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '" + findFile + "'", readFileToString.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
                    Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log.This string checks that the job has been started with a parallelism of 2. Log contents: '" + findFile + "'", readFileToString.contains(" (2/2) (attempt #0) to "));
                    LOG.info("Checking again that app has finished");
                    do {
                        sleep(500);
                        applicationReport = createYarnClient.getApplicationReport(applicationId2);
                        LOG.info("Got report {}", applicationReport);
                    } while (applicationReport.getYarnApplicationState() == YarnApplicationState.RUNNING);
                } catch (Throwable th) {
                    LOG.warn("Error while detached yarn session was running", th);
                    Assert.fail(th.getMessage());
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    @Test(timeout = 60000)
    public void testDetachedPerJobYarnCluster() {
        LOG.info("Starting testDetachedPerJobYarnCluster()");
        File findFile = YarnTestBase.findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new YarnTestBase.ContainsName(new String[]{"-WordCount.jar"}, "streaming"));
        Assert.assertNotNull("Could not find wordcount jar", findFile);
        testDetachedPerJobYarnClusterInternal(findFile.getAbsolutePath());
        LOG.info("Finished testDetachedPerJobYarnCluster()");
    }

    @Test(timeout = 60000)
    public void testDetachedPerJobYarnClusterWithStreamingJob() {
        LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
        File findFile = YarnTestBase.findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new YarnTestBase.ContainsName(new String[]{"flink-streaming-examples", "-WordCount.jar"}));
        Assert.assertNotNull("Could not find wordcount jar", findFile);
        testDetachedPerJobYarnClusterInternal(findFile.getAbsolutePath());
        LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
    }

    @Test
    public void testJavaAPI() {
        LOG.info("Starting testJavaAPI()");
        AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
        Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
        flinkYarnClient.setTaskManagerCount(1);
        flinkYarnClient.setJobManagerMemory(768);
        flinkYarnClient.setTaskManagerMemory(768);
        flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
        String str = System.getenv("FLINK_CONF_DIR");
        flinkYarnClient.setConfigurationDirectory(str);
        flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
        flinkYarnClient.setConfigurationFilePath(new Path(str + File.separator + "flink-conf.yaml"));
        AbstractFlinkYarnCluster abstractFlinkYarnCluster = null;
        try {
            abstractFlinkYarnCluster = flinkYarnClient.deploy(null);
            abstractFlinkYarnCluster.connectToCluster();
        } catch (Exception e) {
            System.err.println("Error while deploying YARN cluster: " + e.getMessage());
            LOG.warn("Failing test", (Throwable) e);
            Assert.fail();
        }
        FlinkYarnClusterStatus flinkYarnClusterStatus = new FlinkYarnClusterStatus(1, 1);
        int i = 0;
        while (true) {
            if (i >= 30) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted", (Throwable) e2);
                Thread.interrupted();
            }
            FlinkYarnClusterStatus clusterStatus = abstractFlinkYarnCluster.getClusterStatus();
            if (clusterStatus != null && clusterStatus.equals(flinkYarnClusterStatus)) {
                LOG.info("Cluster reached status " + clusterStatus);
                break;
            } else {
                if (i > 15) {
                    Assert.fail("The custer didn't start after 15 seconds");
                }
                i++;
            }
        }
        Assert.assertNotNull(abstractFlinkYarnCluster.getJobManagerAddress());
        Assert.assertNotNull(abstractFlinkYarnCluster.getWebInterfaceURL());
        LOG.info("Shutting down cluster. All tests passed");
        abstractFlinkYarnCluster.shutdown(false);
        LOG.info("Finished testJavaAPI()");
    }
}
