/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.launcher;

import cn.tenmg.flink.jobs.FlinkJobsLauncher;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.launcher.AbstractFlinkJobsLauncher;
import cn.tenmg.flink.jobs.launcher.context.FlinkJobsLauncherContext;
import cn.tenmg.flink.jobs.launcher.utils.HttpClientUtils;
import cn.tenmg.flink.jobs.launcher.utils.Sets;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandLineFlinkJobsLauncher
extends AbstractFlinkJobsLauncher {
    private static final Logger log = LoggerFactory.getLogger(CommandLineFlinkJobsLauncher.class);
    private static final boolean isWindows = System.getProperty("os.name", "").toLowerCase().contains("windows");
    private static final char BLANK_SPACE = ' ';
    private static final char EQUAL = '=';
    private static final String LINE_SEPARATOR = System.lineSeparator();
    private static final String APPLICATION_ID_PREFFIX = "Submitting application master";
    private static final String JOB_ID_PREFIX = "Job has been submitted with JobID ";
    private static final String JOB_FINISHED_PREFIX = "Job with JobID ";
    private static final String JOB_FINISHED_SUFFIX = " has finished";
    private static final String CURRENT_STATE_PREFFIX = "Deploying cluster, current state";
    private static final String RUNNING_LOG = "Starting execution of program";
    private static final String YARN_RUNNING_LOG = "YARN application has been deployed successfully";
    private static final String FINISHED_LOG = "Program execution finished";
    private static final String PROPERTIES_KEY_PREFIXX = "-D";
    private static final String YARN_APPLICATION_NAME_KEY = "-Dyarn.application.name";
    private static final String YARN_FINISHED_LOG = "YARN application has been finished successfully";
    private static final String EXCEPTION = "Exception";
    private static final String SAVEPOINT_PATH_PREFIX = "Savepoint completed. Path: ";
    private static final Pattern JOB_FINISHED_PATTERN = Pattern.compile("Job with JobID [\\S]+ has finished");
    private static int JOB_ID_PREFIX_LEN = "Job has been submitted with JobID ".length();
    private static int APPLICATION_ID_PREFFIX_LEN = "Submitting application master".length();
    private static int JOB_FINISHED_PREFIX_LEN = "Job with JobID ".length();
    private static int JOB_FINISHED_SUFFIX_LEN = " has finished".length();
    private static int CURRENT_STATE_PREFFIX_LEN = "Deploying cluster, current state".length();
    private static int SAVEPOINT_PATH_PREFIX_LEN = "Savepoint completed. Path: ".length();
    private static final Set<String> TARGET = Sets.as("-t", "--target");
    private static final Set<String> FROM_SAVEPOINT = Sets.as("-s", "--fromSavepoint");
    private String flinkHome = FlinkJobsLauncherContext.getProperty("commandline.flink.home");
    private Action action = Action.fromString(FlinkJobsLauncherContext.getProperty("commandline.launch.action", "run"));
    private String tempFilePrefix = FlinkJobsLauncherContext.getProperty("commandline.launch.temp_file_prefix", "flink-jobs_");
    private String yarnRest = FlinkJobsLauncherContext.getProperty("commandline.yarn.rest");
    private int yarnApplicationCheckAttempts = Integer.parseInt(FlinkJobsLauncherContext.getProperty("commandline.yarn.application_check_attempts", "60"));
    private int timeMillisBetweenYarnApplicationCheckAttempts = Integer.parseInt(FlinkJobsLauncherContext.getProperty("commandline.yarn.time_millis_between_application_check_attempts", "3000"));
    private String yarnApplicationIdPrefix = FlinkJobsLauncherContext.getProperty("commandline.yarn.application_id_prefix", "application_");

    public String getFlinkHome() {
        return this.flinkHome;
    }

    public void setFlinkHome(String flinkHome) {
        this.flinkHome = flinkHome;
    }

    public Action getAction() {
        return this.action;
    }

    public void setAction(Action action) {
        this.action = action;
    }

    public String getYarnRest() {
        return this.yarnRest;
    }

    public void setYarnRest(String yarnRest) {
        this.yarnRest = yarnRest;
    }

    public String getTempFilePrefix() {
        return this.tempFilePrefix;
    }

    public void setTempFilePrefix(String tempFilePrefix) {
        this.tempFilePrefix = tempFilePrefix;
    }

    @Deprecated
    public int getYarnRestAttempts() {
        return this.yarnApplicationCheckAttempts;
    }

    @Deprecated
    public void setYarnRestAttempts(int yarnRestAttempts) {
        this.yarnApplicationCheckAttempts = yarnRestAttempts;
    }

    public int getYarnApplicationCheckAttempts() {
        return this.yarnApplicationCheckAttempts;
    }

    public void setYarnApplicationCheckAttempts(int yarnApplicationCheckAttempts) {
        this.yarnApplicationCheckAttempts = yarnApplicationCheckAttempts;
    }

    public int getTimeMillisBetweenYarnApplicationCheckAttempts() {
        return this.timeMillisBetweenYarnApplicationCheckAttempts;
    }

    public void setTimeMillisBetweenYarnApplicationCheckAttempts(int timeMillisBetweenYarnApplicationCheckAttempts) {
        this.timeMillisBetweenYarnApplicationCheckAttempts = timeMillisBetweenYarnApplicationCheckAttempts;
    }

    public String getYarnApplicationIdPrefix() {
        return this.yarnApplicationIdPrefix;
    }

    public void setYarnApplicationIdPrefix(String yarnApplicationIdPrefix) {
        this.yarnApplicationIdPrefix = yarnApplicationIdPrefix;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AbstractFlinkJobsLauncher.FlinkJobsInfo launch(FlinkJobs flinkJobs) throws Exception {
        String jar = CommandLineFlinkJobsLauncher.getJar(flinkJobs);
        int jarIndex = CommandLineFlinkJobsLauncher.validateJar(jar);
        String defaultAppName = flinkJobs.getServiceName();
        if (defaultAppName == null) {
            int begin = jar.lastIndexOf(47);
            if (begin < 0) {
                begin = jar.lastIndexOf(92);
            }
            defaultAppName = jar.substring(begin + 1, jarIndex);
        }
        StringBuilder commandBuilder = new StringBuilder();
        String dirPath = File.separator;
        if (this.flinkHome == null) {
            commandBuilder.append("flink");
        } else {
            dirPath = dirPath + "bin";
            commandBuilder.append(this.flinkHome + File.separator + "bin" + File.separator + "flink");
        }
        commandBuilder.append(' ').append(this.action.getName());
        String appName = this.attachOptions(commandBuilder, defaultAppName, flinkJobs.getOptions(), flinkJobs.isAllwaysNewJob());
        String entryPointClassName = CommandLineFlinkJobsLauncher.getEntryPointClassName(flinkJobs);
        if (entryPointClassName != null) {
            commandBuilder.append(' ').append("-c").append(' ').append(entryPointClassName);
        }
        commandBuilder.append(' ').append(jar);
        String arguments = CommandLineFlinkJobsLauncher.getArguments(flinkJobs);
        if (!CommandLineFlinkJobsLauncher.isEmptyArguments(arguments).booleanValue()) {
            commandBuilder.append(' ').append(CommandLineFlinkJobsLauncher.encode(arguments));
        }
        String command = commandBuilder.toString();
        log.info("Execute command: " + command);
        Process p = null;
        InputStream is = null;
        BufferedReader r = null;
        File temp = null;
        AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo = new AbstractFlinkJobsLauncher.FlinkJobsInfo();
        try {
            String[] cmdarray;
            temp = File.createTempFile(this.tempFilePrefix, isWindows ? ".bat" : ".sh");
            temp.setExecutable(true);
            try (FileWriter w = null;){
                w = new FileWriter(temp);
                if (isWindows) {
                    cmdarray = new String[]{"cmd", "/C", temp.getAbsolutePath()};
                } else {
                    File profile = new File("/etc/profile");
                    if (profile.exists()) {
                        try (FileReader fr = null;){
                            fr = new FileReader(profile);
                            int ch = 0;
                            while ((ch = fr.read()) != -1) {
                                w.write(ch);
                            }
                        }
                    }
                    w.write(LINE_SEPARATOR);
                    cmdarray = new String[]{"/bin/sh", "-c", temp.getAbsolutePath()};
                }
                w.write(command);
            }
            p = Runtime.getRuntime().exec(cmdarray, null, new File(dirPath));
            new InputStreamCatcher(p.getInputStream(), appInfo).start();
            new ErrorStreamCatcher(p.getErrorStream(), appInfo).start();
            p.waitFor();
            FlinkJobsLauncher.FlinkJobsInfo.State state = null;
            String jobsId = appInfo.getJobId();
            if (jobsId == null && appName != null) {
                appName = appName.trim();
                if (this.yarnRest == null) {
                    log.warn("Please set the yarnRest for the launcher to get applicationId");
                } else {
                    int attemptsLeft = this.yarnApplicationCheckAttempts;
                    String jobId = null;
                    while (jobId == null && attemptsLeft-- > 0) {
                        long millisLeft;
                        JSONObject apps;
                        String result = HttpClientUtils.get(this.yarnRest + "/apps");
                        if (result == null) {
                            Thread.sleep(this.timeMillisBetweenYarnApplicationCheckAttempts);
                            continue;
                        }
                        long millisBegin = System.currentTimeMillis();
                        JSONObject json = JSON.parseObject((String)result);
                        if (json.containsKey((Object)"apps") && (apps = json.getJSONObject("apps")).containsKey((Object)"app")) {
                            JSONArray appArr = apps.getJSONArray("app");
                            int size = appArr.size();
                            for (int i = 0; i < size; ++i) {
                                JSONObject app = appArr.getJSONObject(i);
                                if (!appName.equals(app.getString("name"))) continue;
                                jobId = app.getString("id");
                                state = FlinkJobsLauncher.FlinkJobsInfo.State.valueOf(app.getString("state"));
                                break;
                            }
                        }
                        if ((millisLeft = System.currentTimeMillis() - millisBegin) <= 0L) continue;
                        Thread.sleep(millisLeft);
                    }
                    appInfo.setJobId(jobId);
                }
            }
            if (state != null) {
                appInfo.setState(state);
            }
            AbstractFlinkJobsLauncher.FlinkJobsInfo flinkJobsInfo = appInfo;
            return flinkJobsInfo;
        }
        catch (Exception e) {
            String message = e.getMessage();
            if (message == null) {
                throw e;
            }
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FAILED);
            appInfo.setMessage(message);
        }
        finally {
            if (is != null) {
                is.close();
            }
            if (r != null) {
                r.close();
            }
            if (p != null) {
                p.destroy();
            }
            if (temp != null) {
                temp.deleteOnExit();
            }
        }
        return appInfo;
    }

    @Override
    public String stop(String jobId) throws Exception {
        if (jobId == null) {
            throw new IllegalArgumentException("jobId must be not null");
        }
        if (jobId.startsWith(this.yarnApplicationIdPrefix)) {
            if (this.yarnRest == null) {
                throw new IllegalArgumentException("Job with jobId starting with prefix '" + this.yarnApplicationIdPrefix + "' are considered as a yarn application, yarnRest must be set when using yarn");
            }
            HttpClientUtils.put(this.yarnRest + (this.yarnRest.endsWith("/") ? "" : "/") + "apps/" + jobId + "/state", "{\"state\":\"KILLED\"}");
            return null;
        }
        StringBuilder commandBuilder = new StringBuilder();
        String dirPath = File.separator;
        if (this.flinkHome == null) {
            commandBuilder.append("flink");
        } else {
            dirPath = dirPath + "bin";
            commandBuilder.append(this.flinkHome + File.separator + "bin" + File.separator + "flink");
        }
        commandBuilder.append(' ').append("stop").append(' ').append(jobId);
        String command = commandBuilder.toString();
        String[] cmdarray = isWindows ? new String[]{"cmd", "/C", command} : new String[]{"/bin/sh", "-c", command};
        log.info("Execute command: " + command);
        Process p = null;
        try {
            p = Runtime.getRuntime().exec(cmdarray, null, new File(dirPath));
            SavepointPathCatcher savepointPathCatcher = new SavepointPathCatcher(p.getInputStream());
            savepointPathCatcher.start();
            p.waitFor();
            String string = savepointPathCatcher.getSavepointPath();
            return string;
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            if (p != null) {
                p.destroy();
            }
        }
    }

    protected String attachOptions(StringBuilder commandBuilder, String defaultAppName, Map<String, String> options, boolean allwaysNewJob) {
        String targetKey = null;
        String target = null;
        String appName = null;
        if (options != null) {
            for (Map.Entry<String, String> entry : options.entrySet()) {
                String key = entry.getKey();
                if (!key.startsWith("-")) {
                    key = "--" + key;
                }
                String value = entry.getValue();
                if (TARGET.contains(key)) {
                    if (value == null || value.isEmpty()) continue;
                    targetKey = key;
                    target = value;
                    continue;
                }
                if (FROM_SAVEPOINT.contains(key)) {
                    if (allwaysNewJob) {
                        log.info("Submit a new job, discard the option '" + key + "'");
                        continue;
                    }
                    commandBuilder.append(' ').append(key).append('=').append(value);
                    continue;
                }
                commandBuilder.append(' ').append(key);
                if (key.startsWith(PROPERTIES_KEY_PREFIXX)) {
                    if (YARN_APPLICATION_NAME_KEY.equals(key)) {
                        appName = value == null ? this.generateApplicationName(defaultAppName) : this.generateApplicationName(value);
                        commandBuilder.append('=').append(appName);
                        continue;
                    }
                    if (value == null || value.isEmpty()) continue;
                    commandBuilder.append('=').append(value);
                    continue;
                }
                if (value == null || value.isEmpty()) continue;
                commandBuilder.append(' ').append(value);
            }
        }
        if (target == null) {
            if (Action.RUN_APPLICATION.equals((Object)this.action)) {
                commandBuilder.append(' ').append("-t").append(' ').append("yarn-application");
                if (appName == null) {
                    appName = this.generateApplicationName(defaultAppName);
                    commandBuilder.append(' ').append(YARN_APPLICATION_NAME_KEY).append('=').append(appName);
                }
            }
        } else {
            commandBuilder.append(' ').append(targetKey).append(' ').append(target);
            if (target.startsWith("yarn") && appName == null) {
                appName = this.generateApplicationName(defaultAppName);
                commandBuilder.append(' ').append(YARN_APPLICATION_NAME_KEY).append('=').append(appName);
            }
        }
        return appName;
    }

    private static String encode(String original) {
        return '\"' + original.replaceAll("\"", "\\\\\"").replaceAll("`", "\\\\`") + '\"';
    }

    private String generateApplicationName(String serviceName) {
        return this.yarnApplicationIdPrefix + serviceName + "_" + System.currentTimeMillis();
    }

    public static class SavepointPathCatcher
    extends Thread {
        private final InputStream is;
        private String savepointPath;

        public String getSavepointPath() {
            return this.savepointPath;
        }

        public SavepointPathCatcher(InputStream is) {
            this.is = is;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BufferedReader r = null;
            try {
                String line;
                r = new BufferedReader(new InputStreamReader(this.is));
                while ((line = r.readLine()) != null) {
                    System.out.println(line);
                    int index = line.indexOf(CommandLineFlinkJobsLauncher.SAVEPOINT_PATH_PREFIX);
                    if (index < 0) continue;
                    this.savepointPath = line.substring(index + SAVEPOINT_PATH_PREFIX_LEN).trim();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                if (r != null) {
                    try {
                        r.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static class ErrorStreamCatcher
    extends Thread {
        private final InputStream is;
        private final AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo;

        public ErrorStreamCatcher(InputStream is, AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo) {
            this.is = is;
            this.appInfo = appInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BufferedReader r = null;
            try {
                String line;
                r = new BufferedReader(new InputStreamReader(this.is));
                while ((line = r.readLine()) != null) {
                    if (!line.contains(CommandLineFlinkJobsLauncher.EXCEPTION)) continue;
                    this.appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FAILED);
                    StringBuffer message = new StringBuffer();
                    message.append(line).append(LINE_SEPARATOR);
                    while ((line = r.readLine()) != null) {
                        System.out.println(line);
                        message.append(line).append(LINE_SEPARATOR);
                    }
                    this.appInfo.setMessage(message.toString());
                    return;
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                if (r != null) {
                    try {
                        r.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static class InputStreamCatcher
    extends Thread {
        private final InputStream is;
        private final AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo;

        public InputStreamCatcher(InputStream is, AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo) {
            this.is = is;
            this.appInfo = appInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BufferedReader r = null;
            FlinkJobsLauncher.FlinkJobsInfo.State state = FlinkJobsLauncher.FlinkJobsInfo.State.SUBMITTED;
            String jobId = null;
            try {
                String line;
                r = new BufferedReader(new InputStreamReader(this.is));
                while ((line = r.readLine()) != null) {
                    System.out.println(line);
                    int index = line.indexOf(CommandLineFlinkJobsLauncher.JOB_ID_PREFIX);
                    if (index >= 0) {
                        jobId = line.substring(JOB_ID_PREFIX_LEN).trim();
                        continue;
                    }
                    index = line.indexOf(CommandLineFlinkJobsLauncher.APPLICATION_ID_PREFFIX);
                    if (index >= 0) {
                        jobId = line.substring(index + APPLICATION_ID_PREFFIX_LEN).trim();
                        continue;
                    }
                    index = line.indexOf(CommandLineFlinkJobsLauncher.CURRENT_STATE_PREFFIX);
                    if (index >= 0) {
                        try {
                            state = FlinkJobsLauncher.FlinkJobsInfo.State.valueOf(line.substring(index + CURRENT_STATE_PREFFIX_LEN).trim());
                        }
                        catch (Exception e) {
                            state = FlinkJobsLauncher.FlinkJobsInfo.State.SUBMITTED;
                        }
                    } else if (line.contains(CommandLineFlinkJobsLauncher.RUNNING_LOG) || line.contains(CommandLineFlinkJobsLauncher.YARN_RUNNING_LOG)) {
                        state = FlinkJobsLauncher.FlinkJobsInfo.State.RUNNING;
                    } else if (line.contains(CommandLineFlinkJobsLauncher.FINISHED_LOG) || line.contains(CommandLineFlinkJobsLauncher.YARN_FINISHED_LOG)) {
                        state = FlinkJobsLauncher.FlinkJobsInfo.State.FINISHED;
                    } else {
                        Matcher m = JOB_FINISHED_PATTERN.matcher(line);
                        if (m.find()) {
                            String group = m.group();
                            jobId = group.substring(JOB_FINISHED_PREFIX_LEN, group.length() - JOB_FINISHED_SUFFIX_LEN);
                            continue;
                        }
                        if (!FlinkJobsLauncher.FlinkJobsInfo.State.SUBMITTED.equals((Object)state) || !line.contains(CommandLineFlinkJobsLauncher.EXCEPTION)) continue;
                        state = FlinkJobsLauncher.FlinkJobsInfo.State.FAILED;
                        StringBuffer message = new StringBuffer();
                        message.append(line).append(LINE_SEPARATOR);
                        while ((line = r.readLine()) != null) {
                            System.out.println(line);
                            message.append(line).append(LINE_SEPARATOR);
                        }
                        this.appInfo.setMessage(message.toString());
                    }
                    break;
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                if (r != null) {
                    try {
                        r.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            this.appInfo.setJobId(jobId);
            FlinkJobsLauncher.FlinkJobsInfo.State status = this.appInfo.getState();
            if (status == null) {
                AbstractFlinkJobsLauncher.FlinkJobsInfo flinkJobsInfo = this.appInfo;
                synchronized (flinkJobsInfo) {
                    status = this.appInfo.getState();
                    if (status == null) {
                        this.appInfo.setState(state);
                    }
                }
            }
        }
    }

    public static enum Action {
        RUN("run"),
        RUN_APPLICATION("run-application");

        private static Map<String, Action> actions;
        private String name;

        private Action(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public static Action fromString(String action) {
            if (actions.containsKey(action)) {
                return actions.get(action);
            }
            return Action.valueOf(action);
        }

        static {
            actions = new HashMap<String, Action>(){
                private static final long serialVersionUID = -1813352166334425568L;
                {
                    this.put(RUN.name, RUN);
                    this.put(RUN_APPLICATION.name, RUN_APPLICATION);
                }
            };
        }
    }
}

