package org.apache.flink.yarn;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Scanner;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MarkerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YarnTestBase.class */
public abstract class YarnTestBase {
    protected static final int NUM_NODEMANAGERS = 2;
    protected static File flinkUberjar;
    private YarnClient yarnClient = null;
    protected static ByteArrayOutputStream outContent;
    protected static ByteArrayOutputStream errContent;
    private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
    protected static final PrintStream originalStdout = System.out;
    protected static final PrintStream originalStderr = System.err;
    protected static String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name";
    protected static final String[] PROHIBITED_STRINGS = {"Exception", "Started SelectChannelConnector@0.0.0.0:8081"};

    @ClassRule
    public static TemporaryFolder tmp = new TemporaryFolder();
    protected static MiniYARNCluster yarnCluster = null;
    protected static final Configuration yarnConfiguration = new YarnConfiguration();

    /* loaded from: input_file:org/apache/flink/yarn/YarnTestBase$ContainsName.class */
    public static class ContainsName implements FilenameFilter {
        private String[] names;
        private String excludeInPath;

        public ContainsName(String[] strArr) {
            this.excludeInPath = null;
            this.names = strArr;
        }

        public ContainsName(String[] strArr, String str) {
            this.excludeInPath = null;
            this.names = strArr;
            this.excludeInPath = str;
        }

        @Override // java.io.FilenameFilter
        public boolean accept(File file, String str) {
            if (this.excludeInPath == null) {
                for (String str2 : this.names) {
                    if (!str.contains(str2)) {
                        return false;
                    }
                }
                return true;
            }
            for (String str3 : this.names) {
                if (!str.contains(str3)) {
                    return false;
                }
            }
            return !file.toString().contains(this.excludeInPath);
        }
    }

    /* loaded from: input_file:org/apache/flink/yarn/YarnTestBase$RootDirFilenameFilter.class */
    public static class RootDirFilenameFilter implements FilenameFilter {
        @Override // java.io.FilenameFilter
        public boolean accept(File file, String str) {
            return str.startsWith("flink-dist") && str.endsWith(".jar") && file.toString().contains("/lib");
        }
    }

    /* loaded from: input_file:org/apache/flink/yarn/YarnTestBase$RunTypes.class */
    enum RunTypes {
        YARN_SESSION,
        CLI_FRONTEND
    }

    /* loaded from: input_file:org/apache/flink/yarn/YarnTestBase$Runner.class */
    public static class Runner extends Thread {
        private final String[] args;
        private int returnValue;
        private RunTypes type;
        private FlinkYarnSessionCli yCli;

        public Runner(String[] strArr, RunTypes runTypes) {
            this.args = strArr;
            this.type = runTypes;
        }

        public int getReturnValue() {
            return this.returnValue;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            switch (this.type) {
                case YARN_SESSION:
                    this.yCli = new FlinkYarnSessionCli("", "");
                    this.returnValue = this.yCli.run(this.args);
                    break;
                case CLI_FRONTEND:
                    try {
                        this.returnValue = new CliFrontend().parseParameters(this.args);
                        break;
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                default:
                    throw new RuntimeException("Unknown type " + this.type);
            }
            if (this.returnValue != 0) {
                Assert.fail("The YARN session returned with non-null value=" + this.returnValue);
            }
        }

        public void sendStop() {
            if (this.yCli != null) {
                this.yCli.stop();
            }
        }
    }

    @After
    public void sleep() {
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
            Assert.fail("Should not happen");
        }
    }

    @Before
    public void checkClusterEmpty() throws IOException, YarnException {
        if (this.yarnClient == null) {
            this.yarnClient = YarnClient.createYarnClient();
            this.yarnClient.init(yarnConfiguration);
            this.yarnClient.start();
        }
        for (ApplicationReport applicationReport : this.yarnClient.getApplications()) {
            if (applicationReport.getYarnApplicationState() != YarnApplicationState.FINISHED && applicationReport.getYarnApplicationState() != YarnApplicationState.KILLED && applicationReport.getYarnApplicationState() != YarnApplicationState.FAILED) {
                Assert.fail("There is at least one application on the cluster is not finished.App " + applicationReport.getApplicationId() + " is in state " + applicationReport.getYarnApplicationState());
            }
        }
    }

    public static File findFile(String str, FilenameFilter filenameFilter) {
        String[] list = new File(str).list();
        if (list == null) {
            return null;
        }
        for (String str2 : list) {
            File file = new File(str + File.separator + str2);
            if (file.isDirectory()) {
                File findFile = findFile(file.getAbsolutePath(), filenameFilter);
                if (findFile != null) {
                    return findFile;
                }
            } else if (filenameFilter.accept(file.getParentFile(), file.getName())) {
                return file;
            }
        }
        return null;
    }

    public static File writeYarnSiteConfigXML(Configuration configuration) throws IOException {
        tmp.create();
        File file = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
        FileWriter fileWriter = new FileWriter(file);
        configuration.writeXml(fileWriter);
        fileWriter.flush();
        fileWriter.close();
        return file;
    }

    public static void ensureNoProhibitedStringInLogFiles(final String[] strArr) {
        File file = new File("target/" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
        Assert.assertTrue("Expecting directory " + file.getAbsolutePath() + " to exist", file.exists());
        Assert.assertTrue("Expecting directory " + file.getAbsolutePath() + " to be a directory", file.isDirectory());
        File findFile = findFile(file.getAbsolutePath(), new FilenameFilter() { // from class: org.apache.flink.yarn.YarnTestBase.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file2, String str) {
                File file3 = new File(file2.getAbsolutePath() + "/" + str);
                try {
                    Scanner scanner = new Scanner(file3);
                    while (scanner.hasNextLine()) {
                        String nextLine = scanner.nextLine();
                        for (String str2 : strArr) {
                            if (nextLine.contains(str2)) {
                                YarnTestBase.LOG.error(MarkerFactory.getMarker("FATAL"), "Prohibited String '{}' in line '{}'", str2, nextLine);
                                return true;
                            }
                        }
                    }
                    return false;
                } catch (FileNotFoundException e) {
                    YarnTestBase.LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + file3.getAbsolutePath());
                    return false;
                }
            }
        });
        if (findFile != null) {
            Scanner scanner = null;
            try {
                scanner = new Scanner(findFile);
            } catch (FileNotFoundException e) {
                Assert.fail("Unable to locate file: " + e.getMessage() + " file: " + findFile.getAbsolutePath());
            }
            LOG.warn("Found a file with a prohibited string. Printing contents:");
            while (scanner.hasNextLine()) {
                LOG.warn("LINE: " + scanner.nextLine());
            }
            Assert.fail("Found a file " + findFile + " with a prohibited string: " + Arrays.toString(strArr));
        }
    }

    public static void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            LOG.warn("Interruped", (Throwable) e);
        }
    }

    public static int getRunningContainers() {
        int i = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            i += yarnCluster.getNodeManager(i2).getNMContext().getContainers().size();
        }
        return i;
    }

    public static void startYARNWithConfig(Configuration configuration) {
        File file = null;
        try {
            file = tmp.newFolder();
        } catch (IOException e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
        System.setProperty("user.home", file.getAbsolutePath());
        LOG.info("Trying to locate uberjar in {}", new File(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER));
        flinkUberjar = findFile(DefaultExpressionEngine.DEFAULT_ESCAPED_DELIMITER, new RootDirFilenameFilter());
        Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
        String parent = flinkUberjar.getParentFile().getParent();
        if (!flinkUberjar.exists()) {
            Assert.fail("Unable to locate yarn-uberjar.jar");
        }
        try {
            LOG.info("Starting up MiniYARNCluster");
            if (yarnCluster == null) {
                yarnCluster = new MiniYARNCluster(configuration.get(TEST_CLUSTER_NAME_KEY), 2, 1, 1);
                yarnCluster.init(configuration);
                yarnCluster.start();
            }
            HashMap hashMap = new HashMap(System.getenv());
            File findFile = findFile(parent, new ContainsName(new String[]{"flink-conf.yaml"}));
            Assert.assertNotNull(findFile);
            hashMap.put("FLINK_CONF_DIR", findFile.getParent());
            hashMap.put("YARN_CONF_DIR", writeYarnSiteConfigXML(configuration).getParentFile().getAbsolutePath());
            hashMap.put("IN_TESTS", "yes we are in tests");
            TestBaseUtils.setEnv(hashMap);
            Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
        } catch (Exception e2) {
            e2.printStackTrace();
            LOG.error("setup failure", (Throwable) e2);
            Assert.fail();
        }
    }

    @BeforeClass
    public static void setup() {
        startYARNWithConfig(yarnConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Runner startWithArgs(String[] strArr, String str, RunTypes runTypes) {
        LOG.info("Running with args {}", Arrays.toString(strArr));
        outContent = new ByteArrayOutputStream();
        errContent = new ByteArrayOutputStream();
        System.setOut(new PrintStream(outContent));
        System.setErr(new PrintStream(errContent));
        Runner runner = new Runner(strArr, runTypes);
        runner.setName("Frontend (CLI/YARN Client) runner thread (runWithArgs()).");
        runner.start();
        for (int i = 0; i < 60; i++) {
            sleep(1000);
            if (outContent.toString().contains(str) || errContent.toString().contains(str)) {
                LOG.info("Found expected output in redirected streams");
                return runner;
            }
            if (!runner.isAlive()) {
                sendOutput();
                Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue());
            }
        }
        sendOutput();
        Assert.fail("During the timeout period of 60 seconds the expected string did not show up");
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runWithArgs(String[] strArr, String str, String[] strArr2, RunTypes runTypes, int i) {
        LOG.info("Running with args {}", Arrays.toString(strArr));
        outContent = new ByteArrayOutputStream();
        errContent = new ByteArrayOutputStream();
        System.setOut(new PrintStream(outContent));
        System.setErr(new PrintStream(errContent));
        Runner runner = new Runner(strArr, runTypes);
        runner.start();
        boolean z = false;
        for (int i2 = 0; i2 < 60; i2++) {
            sleep(1000);
            String byteArrayOutputStream = outContent.toString();
            String byteArrayOutputStream2 = errContent.toString();
            if (strArr2 != null) {
                for (String str2 : strArr2) {
                    if (byteArrayOutputStream.contains(str2) || byteArrayOutputStream2.contains(str2)) {
                        LOG.warn("Failing test. Output contained illegal string '" + str2 + "'");
                        sendOutput();
                        runner.sendStop();
                        Assert.fail("Output contained illegal string '" + str2 + "'");
                    }
                }
            }
            if (byteArrayOutputStream.contains(str) || byteArrayOutputStream2.contains(str)) {
                z = true;
                LOG.info("Found expected output in redirected streams");
                LOG.info("RunWithArgs: request runner to stop");
                runner.sendStop();
                try {
                    runner.join(10000L);
                } catch (InterruptedException e) {
                    LOG.debug("Interrupted while stopping runner", (Throwable) e);
                }
                LOG.warn("RunWithArgs runner stopped.");
                break;
            }
            if (!runner.isAlive()) {
                sendOutput();
                Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue());
            }
        }
        sendOutput();
        Assert.assertTrue("During the timeout period of 60 seconds the expected string did not show up", z);
        Assert.assertTrue("Expecting return value == " + i, runner.getReturnValue() == i);
        LOG.info("Test was successful");
    }

    protected static void sendOutput() {
        System.setOut(originalStdout);
        System.setErr(originalStderr);
        LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
        LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
    }

    @AfterClass
    public static void tearDown() {
        if (yarnCluster != null) {
            LOG.info("Shutting down MiniYarn cluster");
            yarnCluster.stop();
            yarnCluster = null;
        }
        if (isOnTravis()) {
            File file = new File("../target/" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
            if (!file.mkdirs()) {
                LOG.warn("Error creating dirs to {}", file);
            }
            File root = tmp.getRoot();
            LOG.info("copying the final files from {} to {}", root.getAbsolutePath(), file.getAbsolutePath());
            try {
                FileUtils.copyDirectoryToDirectory(root, file);
            } catch (IOException e) {
                LOG.warn("Error copying the final files from {} to {}: msg: {}", root.getAbsolutePath(), file.getAbsolutePath(), e.getMessage(), e);
            }
        }
    }

    public static boolean isOnTravis() {
        return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
    }

    static {
        yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
        yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096);
        yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
        yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
        yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
        yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
        yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
        yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
        yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666);
        yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000);
    }
}
