package org.apache.flink.yarn.cli;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.shaded.com.google.common.base.Ascii;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/cli/FlinkYarnSessionCli.class */
public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
    public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
    public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
    private static final int CLIENT_POLLING_INTERVALL = 3;
    private static final String ID = "yarn-cluster";
    private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
    static final String YARN_APPLICATION_ID_KEY = "applicationID";
    private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
    private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
    private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
    private final Option QUERY;
    private final Option APPLICATION_ID;
    private final Option QUEUE;
    private final Option SHIP_PATH;
    private final Option FLINK_JAR;
    private final Option JM_MEMORY;
    private final Option TM_MEMORY;
    private final Option CONTAINER;
    private final Option SLOTS;
    private final Option DETACHED;
    private final Option ZOOKEEPER_NAMESPACE;

    @Deprecated
    private final Option STREAMING;
    private final Option NAME;
    private final Options ALL_OPTIONS;
    private final Option DYNAMIC_PROPERTIES;
    private final boolean acceptInteractiveInput;
    private YarnClusterClient yarnCluster;
    private boolean detachedMode;

    public FlinkYarnSessionCli(String str, String str2) {
        this(str, str2, true);
    }

    public FlinkYarnSessionCli(String str, String str2, boolean z) {
        this.detachedMode = false;
        this.acceptInteractiveInput = z;
        this.QUERY = new Option(str + "q", str2 + "query", false, "Display available YARN resources (memory, cores)");
        this.APPLICATION_ID = new Option(str + "id", str2 + "applicationId", true, "Attach to running YARN session");
        this.QUEUE = new Option(str + "qu", str2 + "queue", true, "Specify YARN queue.");
        this.SHIP_PATH = new Option(str + "t", str2 + "ship", true, "Ship files in the specified directory (t for transfer)");
        this.FLINK_JAR = new Option(str + "j", str2 + "jar", true, "Path to Flink jar file");
        this.JM_MEMORY = new Option(str + "jm", str2 + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
        this.TM_MEMORY = new Option(str + "tm", str2 + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
        this.CONTAINER = new Option(str + "n", str2 + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
        this.SLOTS = new Option(str + "s", str2 + "slots", true, "Number of slots per TaskManager");
        this.DYNAMIC_PROPERTIES = new Option(str + "D", true, "Dynamic properties");
        this.DETACHED = new Option(str + "d", str2 + "detached", false, "Start detached");
        this.STREAMING = new Option(str + "st", str2 + "streaming", false, "Start Flink in streaming mode");
        this.NAME = new Option(str + "nm", str2 + "name", true, "Set a custom name for the application on YARN");
        this.ZOOKEEPER_NAMESPACE = new Option(str + "z", str2 + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
        this.ALL_OPTIONS = new Options();
        this.ALL_OPTIONS.addOption(this.FLINK_JAR);
        this.ALL_OPTIONS.addOption(this.JM_MEMORY);
        this.ALL_OPTIONS.addOption(this.TM_MEMORY);
        this.ALL_OPTIONS.addOption(this.CONTAINER);
        this.ALL_OPTIONS.addOption(this.QUEUE);
        this.ALL_OPTIONS.addOption(this.QUERY);
        this.ALL_OPTIONS.addOption(this.SHIP_PATH);
        this.ALL_OPTIONS.addOption(this.SLOTS);
        this.ALL_OPTIONS.addOption(this.DYNAMIC_PROPERTIES);
        this.ALL_OPTIONS.addOption(this.DETACHED);
        this.ALL_OPTIONS.addOption(this.STREAMING);
        this.ALL_OPTIONS.addOption(this.NAME);
        this.ALL_OPTIONS.addOption(this.APPLICATION_ID);
        this.ALL_OPTIONS.addOption(this.ZOOKEEPER_NAMESPACE);
    }

    private String loadYarnPropertiesFile(CommandLine commandLine, Configuration configuration) {
        if (commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt(), (String) null) != null) {
            return null;
        }
        for (Option option : commandLine.getOptions()) {
            if (this.ALL_OPTIONS.hasOption(option.getOpt()) && !option.getOpt().equals(this.DETACHED.getOpt())) {
                return null;
            }
        }
        File yarnPropertiesLocation = getYarnPropertiesLocation(configuration);
        if (!yarnPropertiesLocation.exists()) {
            return null;
        }
        logAndSysout("Found YARN properties file " + yarnPropertiesLocation.getAbsolutePath());
        Properties properties = new Properties();
        try {
            FileInputStream fileInputStream = new FileInputStream(yarnPropertiesLocation);
            Throwable th = null;
            try {
                properties.load(fileInputStream);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                String property = properties.getProperty(YARN_APPLICATION_ID_KEY);
                if (property == null) {
                    throw new IllegalConfigurationException("Yarn properties file found but doesn't contain a Yarn applicaiton id. Please delete the file at " + yarnPropertiesLocation.getAbsolutePath());
                }
                try {
                    ConverterUtils.toApplicationId(property);
                    logAndSysout("Using Yarn application id from YARN properties " + property);
                    String property2 = properties.getProperty(YARN_PROPERTIES_PARALLELISM);
                    if (property2 != null) {
                        try {
                            int parseInt = Integer.parseInt(property2);
                            configuration.setInteger("parallelism.default", parseInt);
                            logAndSysout("YARN properties set default parallelism to " + parseInt);
                        } catch (NumberFormatException e) {
                            throw new RuntimeException("Error while parsing the YARN properties: Property parallelism is not an integer.");
                        }
                    }
                    for (Map.Entry<String, String> entry : getDynamicProperties(properties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING)).entrySet()) {
                        configuration.setString(entry.getKey(), entry.getValue());
                    }
                    return property;
                } catch (Exception e2) {
                    throw new RuntimeException("YARN properties contains an invalid entry for application id: " + property, e2);
                }
            } finally {
            }
        } catch (IOException e3) {
            throw new RuntimeException("Cannot read the YARN properties file", e3);
        }
    }

    public AbstractYarnClusterDescriptor createDescriptor(String str, CommandLine commandLine) {
        Path path;
        AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor();
        if (!commandLine.hasOption(this.CONTAINER.getOpt())) {
            LOG.error("Missing required argument {}", this.CONTAINER.getOpt());
            printUsage();
            throw new IllegalArgumentException("Missing required argument " + this.CONTAINER.getOpt());
        }
        clusterDescriptor.setTaskManagerCount(Integer.valueOf(commandLine.getOptionValue(this.CONTAINER.getOpt())).intValue());
        if (commandLine.hasOption(this.FLINK_JAR.getOpt())) {
            String optionValue = commandLine.getOptionValue(this.FLINK_JAR.getOpt());
            if (!optionValue.startsWith("file://")) {
                optionValue = "file://" + optionValue;
            }
            path = new Path(optionValue);
        } else {
            LOG.info("No path for the flink jar passed. Using the location of " + clusterDescriptor.getClass() + " to locate the jar");
            String path2 = clusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
            try {
                path = new Path(new File(URLDecoder.decode(path2, Charset.defaultCharset().name())).toURI());
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + path2 + " Please supply a path manually via the -" + this.FLINK_JAR.getOpt() + " option.");
            }
        }
        clusterDescriptor.setLocalJarPath(path);
        ArrayList arrayList = new ArrayList();
        if (commandLine.hasOption(this.SHIP_PATH.getOpt())) {
            File file = new File(commandLine.getOptionValue(this.SHIP_PATH.getOpt()));
            if (file.isDirectory()) {
                arrayList.add(file);
            } else {
                LOG.warn("Ship directory is not a directory. Ignoring it.");
            }
        }
        clusterDescriptor.addShipFiles(arrayList);
        if (commandLine.hasOption(this.QUEUE.getOpt())) {
            clusterDescriptor.setQueue(commandLine.getOptionValue(this.QUEUE.getOpt()));
        }
        if (commandLine.hasOption(this.JM_MEMORY.getOpt())) {
            clusterDescriptor.setJobManagerMemory(Integer.valueOf(commandLine.getOptionValue(this.JM_MEMORY.getOpt())).intValue());
        }
        if (commandLine.hasOption(this.TM_MEMORY.getOpt())) {
            clusterDescriptor.setTaskManagerMemory(Integer.valueOf(commandLine.getOptionValue(this.TM_MEMORY.getOpt())).intValue());
        }
        if (commandLine.hasOption(this.SLOTS.getOpt())) {
            clusterDescriptor.setTaskManagerSlots(Integer.valueOf(commandLine.getOptionValue(this.SLOTS.getOpt())).intValue());
        }
        String[] strArr = null;
        if (commandLine.hasOption(this.DYNAMIC_PROPERTIES.getOpt())) {
            strArr = commandLine.getOptionValues(this.DYNAMIC_PROPERTIES.getOpt());
        }
        clusterDescriptor.setDynamicPropertiesEncoded(StringUtils.join(strArr, YARN_DYNAMIC_PROPERTIES_SEPARATOR));
        if (commandLine.hasOption(this.DETACHED.getOpt()) || commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
            this.detachedMode = true;
            clusterDescriptor.setDetachedMode(true);
        }
        if (commandLine.hasOption(this.NAME.getOpt())) {
            clusterDescriptor.setName(commandLine.getOptionValue(this.NAME.getOpt()));
        } else if (str != null) {
            clusterDescriptor.setName(str);
        }
        if (commandLine.hasOption(this.ZOOKEEPER_NAMESPACE.getOpt())) {
            clusterDescriptor.setZookeeperNamespace(commandLine.getOptionValue(this.ZOOKEEPER_NAMESPACE.getOpt()));
        }
        int taskManagerSlots = clusterDescriptor.getTaskManagerSlots();
        if (taskManagerSlots == -1) {
            taskManagerSlots = 1;
            clusterDescriptor.setTaskManagerSlots(1);
        }
        int taskManagerCount = taskManagerSlots * clusterDescriptor.getTaskManagerCount();
        int intValue = Integer.valueOf(commandLine.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1")).intValue();
        if (intValue != -1) {
            int ceil = (int) Math.ceil(intValue / clusterDescriptor.getTaskManagerCount());
            logAndSysout("The YARN cluster has " + taskManagerCount + " slots available, but the user requested a parallelism of " + intValue + " on YARN. Each of the " + clusterDescriptor.getTaskManagerCount() + " TaskManagers will get " + ceil + " slots.");
            clusterDescriptor.setTaskManagerSlots(ceil);
        }
        return clusterDescriptor;
    }

    private void printUsage() {
        System.out.println("Usage:");
        HelpFormatter helpFormatter = new HelpFormatter();
        helpFormatter.setWidth(200);
        helpFormatter.setLeftPadding(5);
        helpFormatter.setSyntaxPrefix("   Required");
        Options options = new Options();
        options.addOption(this.CONTAINER);
        helpFormatter.printHelp(" ", options);
        helpFormatter.setSyntaxPrefix("   Optional");
        Options options2 = new Options();
        addGeneralOptions(options2);
        addRunOptions(options2);
        helpFormatter.printHelp(" ", options2);
    }

    private static void writeYarnProperties(Properties properties, File file) {
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            Throwable th = null;
            try {
                try {
                    properties.store(fileOutputStream, "Generated YARN properties file");
                    if (fileOutputStream != null) {
                        if (0 != 0) {
                            try {
                                fileOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileOutputStream.close();
                        }
                    }
                    file.setReadable(true, false);
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Error writing the properties file", e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:34:0x0111. Please report as an issue. */
    public static void runInteractiveCli(YarnClusterClient yarnClusterClient, boolean z) {
        int i = 0;
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                GetClusterStatusResponse clusterStatus = yarnClusterClient.getClusterStatus();
                LOG.debug("Received status message: {}", clusterStatus);
                if (clusterStatus != null && i != clusterStatus.numRegisteredTaskManagers()) {
                    System.err.println("Number of connected TaskManagers changed to " + clusterStatus.numRegisteredTaskManagers() + ". Slots available: " + clusterStatus.totalNumberOfSlots());
                    i = clusterStatus.numRegisteredTaskManagers();
                }
                List<String> newMessages = yarnClusterClient.getNewMessages();
                if (newMessages != null && newMessages.size() > 0) {
                    System.err.println("New messages from the YARN cluster: ");
                    Iterator<String> it = newMessages.iterator();
                    while (it.hasNext()) {
                        System.err.println(it.next());
                    }
                }
                if (yarnClusterClient.getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
                    System.err.println("The YARN cluster has failed");
                    yarnClusterClient.shutdown();
                }
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < 3000 && (!z || !bufferedReader.ready())) {
                    Thread.sleep(200L);
                }
                if (z && bufferedReader.ready()) {
                    String readLine = bufferedReader.readLine();
                    boolean z2 = -1;
                    switch (readLine.hashCode()) {
                        case 3198785:
                            if (readLine.equals("help")) {
                                z2 = 2;
                                break;
                            }
                            break;
                        case 3482191:
                            if (readLine.equals("quit")) {
                                z2 = false;
                                break;
                            }
                            break;
                        case 3540994:
                            if (readLine.equals("stop")) {
                                z2 = true;
                                break;
                            }
                            break;
                    }
                    switch (z2) {
                        case false:
                        case Ascii.SOH /* 1 */:
                            yarnClusterClient.shutdownCluster();
                            break;
                        case true:
                            System.err.println("Available commands:\nhelp - show these commands\nstop - stop the YARN session");
                            break;
                        default:
                            System.err.println("Unknown command '" + readLine + "'. Showing help: \nAvailable commands:\nhelp - show these commands\nstop - stop the YARN session");
                            break;
                    }
                }
                if (yarnClusterClient.hasBeenShutdown()) {
                    LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while running the interactive command line interface", e);
        }
    }

    public static void main(String[] strArr) {
        System.exit(new FlinkYarnSessionCli("", "").run(strArr));
    }

    public boolean isActive(CommandLine commandLine, Configuration configuration) {
        return ID.equals(commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt(), (String) null)) || commandLine.hasOption(this.APPLICATION_ID.getOpt()) || loadYarnPropertiesFile(commandLine, configuration) != null;
    }

    public String getId() {
        return ID;
    }

    public void addRunOptions(Options options) {
        Iterator it = this.ALL_OPTIONS.getOptions().iterator();
        while (it.hasNext()) {
            options.addOption((Option) it.next());
        }
    }

    public void addGeneralOptions(Options options) {
        options.addOption(this.APPLICATION_ID);
    }

    /* renamed from: retrieveCluster, reason: merged with bridge method [inline-methods] */
    public YarnClusterClient m256retrieveCluster(CommandLine commandLine, Configuration configuration) throws UnsupportedOperationException {
        String optionValue = commandLine.hasOption(this.APPLICATION_ID.getOpt()) ? commandLine.getOptionValue(this.APPLICATION_ID.getOpt()) : loadYarnPropertiesFile(commandLine, configuration);
        if (null == optionValue) {
            throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
        }
        configuration.setString("recovery.zookeeper.path.namespace", commandLine.hasOption(this.ZOOKEEPER_NAMESPACE.getOpt()) ? commandLine.getOptionValue(this.ZOOKEEPER_NAMESPACE.getOpt()) : configuration.getString("recovery.zookeeper.path.namespace", optionValue));
        AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor();
        clusterDescriptor.setFlinkConfiguration(configuration);
        return clusterDescriptor.m206retrieve(optionValue);
    }

    /* renamed from: createCluster, reason: merged with bridge method [inline-methods] */
    public YarnClusterClient m255createCluster(String str, CommandLine commandLine, Configuration configuration) {
        AbstractYarnClusterDescriptor createDescriptor = createDescriptor(str, commandLine);
        createDescriptor.setFlinkConfiguration(configuration);
        try {
            return createDescriptor.m205deploy();
        } catch (Exception e) {
            throw new RuntimeException("Error deploying the YARN cluster", e);
        }
    }

    public int run(String[] strArr) {
        Options options = new Options();
        addGeneralOptions(options);
        addRunOptions(options);
        try {
            CommandLine parse = new PosixParser().parse(options, strArr);
            if (parse.hasOption(this.QUERY.getOpt())) {
                try {
                    System.out.println(getClusterDescriptor().getClusterDescription());
                    return 0;
                } catch (Exception e) {
                    System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage());
                    e.printStackTrace(System.err);
                    return 1;
                }
            }
            if (parse.hasOption(this.APPLICATION_ID.getOpt())) {
                try {
                    this.yarnCluster = getClusterDescriptor().m206retrieve(parse.getOptionValue(this.APPLICATION_ID.getOpt()));
                    if (!this.detachedMode) {
                        runInteractiveCli(this.yarnCluster, true);
                        return 0;
                    }
                    LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + this.APPLICATION_ID.getOpt());
                    this.yarnCluster.disconnect();
                    return 0;
                } catch (Exception e2) {
                    throw new RuntimeException("Could not retrieve existing Yarn application", e2);
                }
            }
            try {
                AbstractYarnClusterDescriptor createDescriptor = createDescriptor(null, parse);
                try {
                    this.yarnCluster = createDescriptor.m205deploy();
                    System.out.println("Flink JobManager is now running on " + (this.yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + this.yarnCluster.getJobManagerAddress().getPort()));
                    System.out.println("JobManager Web Interface: " + this.yarnCluster.getWebInterfaceURL());
                    File yarnPropertiesLocation = getYarnPropertiesLocation(this.yarnCluster.getFlinkConfiguration());
                    Properties properties = new Properties();
                    properties.setProperty(YARN_APPLICATION_ID_KEY, this.yarnCluster.getApplicationId().toString());
                    if (createDescriptor.getTaskManagerSlots() != -1) {
                        properties.setProperty(YARN_PROPERTIES_PARALLELISM, Integer.toString(createDescriptor.getTaskManagerSlots() * createDescriptor.getTaskManagerCount()));
                    }
                    if (createDescriptor.getDynamicPropertiesEncoded() != null) {
                        properties.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, createDescriptor.getDynamicPropertiesEncoded());
                    }
                    writeYarnProperties(properties, yarnPropertiesLocation);
                    if (!this.detachedMode) {
                        runInteractiveCli(this.yarnCluster, this.acceptInteractiveInput);
                        return 0;
                    }
                    LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + this.yarnCluster.getApplicationId() + System.lineSeparator() + "Please also note that the temporary files of the YARN session in {} will not be removed.", createDescriptor.getSessionFilesDir());
                    this.yarnCluster.waitForClusterToBeReady();
                    this.yarnCluster.disconnect();
                    return 0;
                } catch (Exception e3) {
                    System.err.println("Error while deploying YARN cluster: " + e3.getMessage());
                    e3.printStackTrace(System.err);
                    return 1;
                }
            } catch (Exception e4) {
                System.err.println("Error while starting the YARN Client: " + e4.getMessage());
                e4.printStackTrace(System.err);
                return 1;
            }
        } catch (Exception e5) {
            System.out.println(e5.getMessage());
            printUsage();
            return 1;
        }
    }

    public void stop() {
        if (this.yarnCluster != null) {
            LOG.info("Command line interface is shutting down the yarnCluster");
            this.yarnCluster.shutdown();
        }
    }

    private void logAndSysout(String str) {
        LOG.info(str);
        System.out.println(str);
    }

    public static Map<String, String> getDynamicProperties(String str) {
        if (str == null || str.length() <= 0) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (String str2 : str.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR)) {
            if (str2 != null) {
                String[] split = str2.split("=");
                if (split.length >= 2 && split[0] != null && split[1] != null && split[0].length() > 0) {
                    hashMap.put(split[0], split[1]);
                }
            }
        }
        return hashMap;
    }

    public static File getYarnPropertiesLocation(Configuration configuration) {
        return new File(configuration.getString("yarn.properties-file.location", System.getProperty("java.io.tmpdir")), YARN_PROPERTIES_FILE + System.getProperty("user.name"));
    }

    protected AbstractYarnClusterDescriptor getClusterDescriptor() {
        return new YarnClusterDescriptor();
    }
}
