package org.apache.flink.yarn;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.jar.JarFile;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

/* loaded from: input_file:org/apache/flink/yarn/Client.class */
public class Client {
    private static final Log LOG = LogFactory.getLog(Client.class);
    private static final Option QUERY = new Option("q", "query", false, "Display available YARN resources (memory, cores)");
    private static final Option VERBOSE = new Option("v", "verbose", false, "Verbose debug mode");
    private static final Option GEN_CONF = new Option("g", "generateConf", false, "Place default configuration file in current directory");
    private static final Option QUEUE = new Option("qu", "queue", true, "Specify YARN queue.");
    private static final Option SHIP_PATH = new Option("s", "ship", true, "Ship files in the specified directory");
    private static final Option FLINK_CONF_DIR = new Option("c", "confDir", true, "Path to Flink configuration directory");
    private static final Option FLINK_JAR = new Option("j", "jar", true, "Path to Flink jar file");
    private static final Option JM_MEMORY = new Option("jm", "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
    private static final Option TM_MEMORY = new Option("tm", "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
    private static final Option TM_CORES = new Option("tmc", "taskManagerCores", true, "Virtual CPU cores per TaskManager");
    private static final Option CONTAINER = new Option("n", "container", true, "Number of Yarn container to allocate (=Number of TaskTrackers)");
    public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
    public static final String ENV_TM_CORES = "_CLIENT_TM_CORES";
    public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
    public static final String ENV_APP_ID = "_APP_ID";
    public static final String FLINK_JAR_PATH = "_FLINK_JAR_PATH";
    public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
    public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
    public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
    private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
    private Configuration conf;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/Client$ClusterResourceDescription.class */
    public static class ClusterResourceDescription {
        public int totalFreeMemory;
        public int containerLimit;

        private ClusterResourceDescription() {
        }
    }

    public void run(String[] strArr) throws Exception {
        Path path;
        if (UserGroupInformation.isSecurityEnabled()) {
            throw new RuntimeException("Flink YARN client does not have security support right now.File a bug, we will fix it asap");
        }
        Options options = new Options();
        options.addOption(VERBOSE);
        options.addOption(FLINK_CONF_DIR);
        options.addOption(FLINK_JAR);
        options.addOption(JM_MEMORY);
        options.addOption(TM_MEMORY);
        options.addOption(TM_CORES);
        options.addOption(CONTAINER);
        options.addOption(GEN_CONF);
        options.addOption(QUEUE);
        options.addOption(QUERY);
        options.addOption(SHIP_PATH);
        CommandLine commandLine = null;
        try {
            commandLine = new PosixParser().parse(options, strArr);
        } catch (MissingOptionException e) {
            System.out.println(e.getMessage());
            printUsage();
            System.exit(1);
        }
        if (System.getProperty("log4j.configuration") == null) {
            Logger rootLogger = Logger.getRootLogger();
            rootLogger.removeAllAppenders();
            rootLogger.addAppender(new ConsoleAppender(new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"), "System.err"));
            if (commandLine.hasOption(VERBOSE.getOpt())) {
                rootLogger.setLevel(Level.DEBUG);
                LOG.debug("CLASSPATH: " + System.getProperty("java.class.path"));
            } else {
                rootLogger.setLevel(Level.INFO);
            }
        }
        if (commandLine.hasOption(FLINK_JAR.getOpt())) {
            String optionValue = commandLine.getOptionValue(FLINK_JAR.getOpt());
            if (!optionValue.startsWith("file://")) {
                optionValue = "file://" + optionValue;
            }
            path = new Path(optionValue);
        } else {
            path = new Path("file://" + Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        }
        if (commandLine.hasOption(GEN_CONF.getOpt())) {
            LOG.info("Placing default configuration in current directory");
            LOG.info("File written to " + generateDefaultConf(path).getAbsolutePath());
            System.exit(0);
        }
        Path path2 = null;
        String str = "";
        if (commandLine.hasOption(FLINK_CONF_DIR.getOpt())) {
            str = commandLine.getOptionValue(FLINK_CONF_DIR.getOpt()) + "/";
            File file = new File(str + CONFIG_FILE_NAME);
            if (!file.exists()) {
                LOG.fatal("Unable to locate configuration file in " + file);
                System.exit(1);
            }
            path2 = new Path(file.getAbsolutePath());
        } else {
            System.out.println("No configuration file has been specified");
            File[] listFiles = new File(".").listFiles(new FilenameFilter() { // from class: org.apache.flink.yarn.Client.1
                @Override // java.io.FilenameFilter
                public boolean accept(File file2, String str2) {
                    return str2 != null && str2.endsWith(".yaml");
                }
            });
            if (listFiles == null || listFiles.length == 0) {
                System.out.println("No configuration file has been found in current directory.\nCopying default.");
                path2 = new Path(generateDefaultConf(path).toURI());
            } else if (listFiles.length > 1) {
                System.out.println("Multiple .yaml configuration files were found in the current directory\nPlease specify one explicitly");
                System.exit(1);
            } else if (listFiles.length == 1) {
                path2 = new Path(listFiles[0].toURI());
            }
        }
        ArrayList arrayList = new ArrayList();
        if (commandLine.hasOption(SHIP_PATH.getOpt())) {
            File file2 = new File(commandLine.getOptionValue(SHIP_PATH.getOpt()));
            if (file2.isDirectory()) {
                arrayList = new ArrayList(Arrays.asList(file2.listFiles(new FilenameFilter() { // from class: org.apache.flink.yarn.Client.2
                    @Override // java.io.FilenameFilter
                    public boolean accept(File file3, String str2) {
                        return (str2.equals(".") || str2.equals("..")) ? false : true;
                    }
                })));
            } else {
                LOG.warn("Ship directory is not a directory!");
            }
        }
        boolean z = false;
        if (str.length() > 0) {
            File file3 = new File(str + "/log4j.properties");
            if (file3.exists()) {
                arrayList.add(file3);
                z = true;
            }
        }
        String optionValue2 = commandLine.hasOption(QUEUE.getOpt()) ? commandLine.getOptionValue(QUEUE.getOpt()) : "default";
        int intValue = commandLine.hasOption(JM_MEMORY.getOpt()) ? Integer.valueOf(commandLine.getOptionValue(JM_MEMORY.getOpt())).intValue() : 512;
        int intValue2 = commandLine.hasOption(TM_MEMORY.getOpt()) ? Integer.valueOf(commandLine.getOptionValue(TM_MEMORY.getOpt())).intValue() : 1024;
        int intValue3 = commandLine.hasOption(TM_CORES.getOpt()) ? Integer.valueOf(commandLine.getOptionValue(TM_CORES.getOpt())).intValue() : 1;
        Utils.getFlinkConfiguration(path2.toUri().getPath());
        int integer = GlobalConfiguration.getInteger("jobmanager.rpc.port", 0);
        if (integer == 0) {
            LOG.warn("Unable to find job manager port in configuration!");
            integer = 6123;
        }
        this.conf = Utils.initializeYarnConfiguration();
        LOG.info("Copy App Master jar from local filesystem and add to local environment");
        FileSystem fileSystem = FileSystem.get(this.conf);
        if (fileSystem.getScheme().startsWith("file")) {
            LOG.warn("The file system scheme is '" + fileSystem.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values.The Flink YARN client needs to store its files in a distributed file system");
        }
        final YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(this.conf);
        createYarnClient.start();
        if (commandLine.hasOption(QUERY.getOpt())) {
            showClusterMetrics(createYarnClient);
        }
        if (!commandLine.hasOption(CONTAINER.getOpt())) {
            LOG.fatal("Missing required argument " + CONTAINER.getOpt());
            printUsage();
            createYarnClient.stop();
            System.exit(1);
        }
        int intValue4 = Integer.valueOf(commandLine.getOptionValue(CONTAINER.getOpt())).intValue();
        System.out.println("Using values:");
        System.out.println("\tContainer Count = " + intValue4);
        System.out.println("\tJar Path = " + path.toUri().getPath());
        System.out.println("\tConfiguration file = " + path2.toUri().getPath());
        System.out.println("\tJobManager memory = " + intValue);
        System.out.println("\tTaskManager memory = " + intValue2);
        System.out.println("\tTaskManager cores = " + intValue3);
        YarnClientApplication createApplication = createYarnClient.createApplication();
        Resource maximumResourceCapability = createApplication.getNewApplicationResponse().getMaximumResourceCapability();
        if (intValue2 > maximumResourceCapability.getMemory() || intValue3 > maximumResourceCapability.getVirtualCores()) {
            LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + ", Maximum Cores: " + intValue3);
            createYarnClient.stop();
            System.exit(1);
        }
        if (intValue > maximumResourceCapability.getMemory()) {
            LOG.fatal("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maximumResourceCapability.getMemory());
            createYarnClient.stop();
            System.exit(1);
        }
        int i = intValue + (intValue2 * intValue4);
        ClusterResourceDescription currentFreeClusterResources = getCurrentFreeClusterResources(createYarnClient);
        if (currentFreeClusterResources.totalFreeMemory < i) {
            LOG.fatal("This YARN session requires " + i + "MB of memory in the cluster. There are currently only " + currentFreeClusterResources.totalFreeMemory + "MB available.");
            createYarnClient.stop();
            System.exit(1);
        }
        if (intValue2 > currentFreeClusterResources.containerLimit) {
            LOG.fatal("The requested amount of memory for the TaskManagers (" + intValue2 + "MB) is more than the largest possible YARN container: " + currentFreeClusterResources.containerLimit);
            createYarnClient.stop();
            System.exit(1);
        }
        if (intValue > currentFreeClusterResources.containerLimit) {
            LOG.fatal("The requested amount of memory for the JobManager (" + intValue + "MB) is more than the largest possible YARN container: " + currentFreeClusterResources.containerLimit);
            createYarnClient.stop();
            System.exit(1);
        }
        String string = GlobalConfiguration.getString("env.java.opts", "");
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        String str2 = "$JAVA_HOME/bin/java -Xmx" + Utils.calculateHeapSize(intValue) + "M " + string;
        if (z) {
            str2 = str2 + " -Dlog.file=\"<LOG_DIR>/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
        }
        String str3 = str2 + " org.apache.flink.yarn.ApplicationMaster  1><LOG_DIR>/jobmanager-stdout.log 2><LOG_DIR>/jobmanager-stderr.log";
        containerLaunchContext.setCommands(Collections.singletonList(str3));
        System.err.println("amCommand=" + str3);
        ApplicationSubmissionContext applicationSubmissionContext = createApplication.getApplicationSubmissionContext();
        final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
        Path path3 = Utils.setupLocalResource(this.conf, fileSystem, applicationId.toString(), path, localResource, fileSystem.getHomeDirectory());
        Path path4 = Utils.setupLocalResource(this.conf, fileSystem, applicationId.toString(), path2, localResource2, fileSystem.getHomeDirectory());
        HashMap hashMap = new HashMap(2);
        hashMap.put("flink.jar", localResource);
        hashMap.put(CONFIG_FILE_NAME, localResource2);
        final Path[] pathArr = new Path[3 + arrayList.size()];
        StringBuffer stringBuffer = new StringBuffer();
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            File file4 = (File) arrayList.get(i2);
            LocalResource localResource3 = (LocalResource) Records.newRecord(LocalResource.class);
            pathArr[3 + i2] = Utils.setupLocalResource(this.conf, fileSystem, applicationId.toString(), new Path("file://" + file4.getAbsolutePath()), localResource3, fileSystem.getHomeDirectory());
            hashMap.put(file4.getName(), localResource3);
            stringBuffer.append(pathArr[3 + i2]);
            if (i2 + 1 < arrayList.size()) {
                stringBuffer.append(',');
            }
        }
        pathArr[0] = path3;
        pathArr[1] = path4;
        pathArr[2] = new Path(fileSystem.getHomeDirectory(), ".flink/" + applicationId.toString() + "/");
        fileSystem.setPermission(pathArr[2], new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
        Utils.setTokensFor(containerLaunchContext, pathArr, this.conf);
        containerLaunchContext.setLocalResources(hashMap);
        fileSystem.close();
        HashMap hashMap2 = new HashMap();
        Utils.setupEnv(this.conf, hashMap2);
        hashMap2.put(ENV_TM_COUNT, String.valueOf(intValue4));
        hashMap2.put(ENV_TM_CORES, String.valueOf(intValue3));
        hashMap2.put(ENV_TM_MEMORY, String.valueOf(intValue2));
        hashMap2.put(FLINK_JAR_PATH, path3.toString());
        hashMap2.put(ENV_APP_ID, applicationId.toString());
        hashMap2.put(ENV_CLIENT_HOME_DIR, fileSystem.getHomeDirectory().toString());
        hashMap2.put(ENV_CLIENT_SHIP_FILES, stringBuffer.toString());
        hashMap2.put(ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
        containerLaunchContext.setEnvironment(hashMap2);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(intValue);
        resource.setVirtualCores(1);
        applicationSubmissionContext.setApplicationName("Flink");
        applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
        applicationSubmissionContext.setResource(resource);
        applicationSubmissionContext.setQueue(optionValue2);
        final File file5 = new File(str + ".yarn-jobmanager");
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.flink.yarn.Client.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Client.LOG.info("Killing the Flink-YARN application.");
                    createYarnClient.killApplication(applicationId);
                    Client.LOG.info("Deleting files in " + pathArr[2]);
                    FileSystem fileSystem2 = FileSystem.get(Client.this.conf);
                    fileSystem2.delete(pathArr[2], true);
                    fileSystem2.close();
                } catch (Exception e2) {
                    Client.LOG.warn("Exception while killing the YARN application", e2);
                }
                try {
                    file5.delete();
                } catch (Exception e3) {
                    Client.LOG.warn("Exception while deleting the jobmanager address file", e3);
                }
                Client.LOG.info("YARN Client is shutting down");
                createYarnClient.stop();
            }
        });
        LOG.info("Submitting application master " + applicationId);
        createYarnClient.submitApplication(applicationSubmissionContext);
        ApplicationReport applicationReport = createYarnClient.getApplicationReport(applicationId);
        YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
        boolean z2 = false;
        char[] cArr = {'/', '|', '\\', '-'};
        int i3 = 0;
        while (yarnApplicationState != YarnApplicationState.FINISHED && yarnApplicationState != YarnApplicationState.KILLED && yarnApplicationState != YarnApplicationState.FAILED) {
            if (!z2 && yarnApplicationState == YarnApplicationState.RUNNING) {
                System.err.println("Flink JobManager is now running on " + applicationReport.getHost() + ":" + integer);
                System.err.println("JobManager Web Interface: " + applicationReport.getTrackingUrl());
                PrintWriter printWriter = new PrintWriter(file5);
                printWriter.println(applicationReport.getHost() + ":" + integer);
                printWriter.close();
                file5.setReadable(true, false);
                z2 = true;
            }
            if (z2) {
                Thread.sleep(5000L);
            } else {
                int i4 = i3;
                i3++;
                System.err.print(cArr[i4] + "\r");
                if (i3 == cArr.length) {
                    i3 = 0;
                }
                Thread.sleep(500L);
            }
            applicationReport = createYarnClient.getApplicationReport(applicationId);
            yarnApplicationState = applicationReport.getYarnApplicationState();
        }
        LOG.info("Application " + applicationId + " finished with state " + yarnApplicationState + " at " + applicationReport.getFinishTime());
        if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
            LOG.warn("Application failed. Diagnostics " + applicationReport.getDiagnostics());
        }
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        ClusterResourceDescription clusterResourceDescription = new ClusterResourceDescription();
        clusterResourceDescription.totalFreeMemory = 0;
        clusterResourceDescription.containerLimit = 0;
        for (NodeReport nodeReport : yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING})) {
            int memory = nodeReport.getCapability().getMemory() - (nodeReport.getUsed() != null ? nodeReport.getUsed().getMemory() : 0);
            clusterResourceDescription.totalFreeMemory += memory;
            if (memory > clusterResourceDescription.containerLimit) {
                clusterResourceDescription.containerLimit = memory;
            }
        }
        return clusterResourceDescription;
    }

    private void printUsage() {
        System.out.println("Usage:");
        HelpFormatter helpFormatter = new HelpFormatter();
        helpFormatter.setWidth(200);
        helpFormatter.setLeftPadding(5);
        helpFormatter.setSyntaxPrefix("   Required");
        Options options = new Options();
        options.addOption(CONTAINER);
        helpFormatter.printHelp(" ", options);
        helpFormatter.setSyntaxPrefix("   Optional");
        Options options2 = new Options();
        options2.addOption(VERBOSE);
        options2.addOption(JM_MEMORY);
        options2.addOption(TM_MEMORY);
        options2.addOption(TM_CORES);
        options2.addOption(QUERY);
        options2.addOption(QUEUE);
        helpFormatter.printHelp(" ", options2);
    }

    private void showClusterMetrics(YarnClient yarnClient) throws YarnException, IOException {
        System.out.println("NodeManagers in the Cluster " + yarnClient.getYarnClusterMetrics().getNumNodeManagers());
        List<NodeReport> nodeReports = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        System.out.printf("|Property         |Value          %n", new Object[0]);
        System.out.println("+---------------------------------------+");
        int i = 0;
        int i2 = 0;
        for (NodeReport nodeReport : nodeReports) {
            Resource capability = nodeReport.getCapability();
            i += capability.getMemory();
            i2 += capability.getVirtualCores();
            System.out.format("|%-16s |%-16s %n", "NodeID", nodeReport.getNodeId());
            System.out.format("|%-16s |%-16s %n", "Memory", capability.getMemory() + " MB");
            System.out.format("|%-16s |%-16s %n", "vCores", Integer.valueOf(capability.getVirtualCores()));
            System.out.format("|%-16s |%-16s %n", "HealthReport", nodeReport.getHealthReport());
            System.out.format("|%-16s |%-16s %n", "Containers", Integer.valueOf(nodeReport.getNumContainers()));
            System.out.println("+---------------------------------------+");
        }
        System.out.println("Summary: totalMemory " + i + " totalCores " + i2);
        for (QueueInfo queueInfo : yarnClient.getAllQueues()) {
            System.out.println("Queue: " + queueInfo.getQueueName() + ", Current Capacity: " + queueInfo.getCurrentCapacity() + " Max Capacity: " + queueInfo.getMaximumCapacity() + " Applications: " + queueInfo.getApplications().size());
        }
        yarnClient.stop();
        System.exit(0);
    }

    private File generateDefaultConf(Path path) throws IOException, FileNotFoundException {
        JarFile jarFile = null;
        try {
            jarFile = new JarFile(path.toUri().getPath());
        } catch (FileNotFoundException e) {
            LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", e);
            System.exit(1);
        }
        InputStream inputStream = jarFile.getInputStream(jarFile.getEntry(CONFIG_FILE_NAME));
        if (inputStream == null) {
            LOG.warn("Given jar file does not contain yaml conf.");
            inputStream = getClass().getResourceAsStream(CONFIG_FILE_NAME);
            if (inputStream == null) {
                throw new RuntimeException("Unable to find flink-conf in jar file");
            }
        }
        File file = new File(CONFIG_FILE_NAME);
        if (file.exists()) {
            throw new RuntimeException("File unexpectedly exists");
        }
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        byte[] bArr = new byte[1024];
        while (true) {
            int read = inputStream.read(bArr);
            if (read == -1) {
                inputStream.close();
                fileOutputStream.close();
                jarFile.close();
                return file;
            }
            fileOutputStream.write(bArr, 0, read);
        }
    }

    public static void main(String[] strArr) throws Exception {
        new Client().run(strArr);
    }
}
