/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.launcher;

import cn.tenmg.flink.jobs.FlinkJobsLauncher;
import cn.tenmg.flink.jobs.launcher.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.launcher.config.model.Option;
import cn.tenmg.flink.jobs.launcher.config.model.Options;
import cn.tenmg.flink.jobs.launcher.config.model.Params;
import cn.tenmg.flink.jobs.launcher.utils.Sets;
import com.alibaba.fastjson.JSON;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CommandLineFlinkJobsLauncher
implements FlinkJobsLauncher {
    private static final boolean isWindows = System.getProperty("os.name", "").toLowerCase().contains("windows");
    private static final char BLANK_SPACE = ' ';
    private static final String APPLICATION_ID_PREFFIX = "Submitting application master";
    private static final String JOB_ID_PREFIX = "Job with JobID ";
    private static final String JOB_ID_SUFFIX = " has finished";
    private static final String CURRENT_STATE_PREFFIX = "Deploying cluster, current state";
    private static final String RUNNING_LOG = "Starting execution of program";
    private static final String YARN_RUNNING_LOG = "YARN application has been deployed successfully";
    private static final String FINISHED_LOG = "Program execution finished";
    private static final String YARN_FINISHED_LOG = "YARN application has been finished successfully";
    private static final String EXCEPTION = "Exception";
    private static final Pattern JOB_ID_PATTERN = Pattern.compile("Job with JobID [\\S]+ has finished");
    private static int APPLICATION_ID_PREFFIX_LEN = "Submitting application master".length();
    private static int JOB_ID_PREFIX_LEN = "Job with JobID ".length();
    private static int JOB_ID_SUFFIX_LEN = " has finished".length();
    private static int CURRENT_STATE_PREFFIX_LEN = "Deploying cluster, current state".length();
    private static final Logger log = LogManager.getLogger(CommandLineFlinkJobsLauncher.class);
    private static Set<String> TARGETS = Sets.as("-t", "--target");
    private String flinkHome;
    private Action action = Action.RUN_APPLICATION;

    public String getFlinkHome() {
        return this.flinkHome;
    }

    public void setFlinkHome(String flinkHome) {
        this.flinkHome = flinkHome;
    }

    public Action getAction() {
        return this.action;
    }

    public void setAction(Action action) {
        this.action = action;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public FlinkJobsApplicationInfo launch(FlinkJobs config) throws Exception {
        StringBuilder commandBuilder = new StringBuilder();
        commandBuilder.append(this.flinkHome + "/bin/flink").append(' ').append(this.action.getName());
        this.attachOptions(commandBuilder, config.getOptions());
        String mainClass = config.getMainClass();
        if (mainClass != null) {
            commandBuilder.append(' ').append("-c").append(' ').append(mainClass);
        }
        commandBuilder.append(' ').append(config.getJar());
        Params params = config.getParams();
        if (params != null) {
            commandBuilder.append(' ').append(JSON.toJSONString((Object)params).replaceAll("\"", "\\\""));
        }
        String command = commandBuilder.toString();
        log.info("Execute command: " + command);
        Process p = null;
        InputStream is = null;
        BufferedReader r = null;
        File temp = null;
        try {
            String[] cmdarray;
            temp = File.createTempFile("flink-jobs/" + UUID.randomUUID().toString().replaceAll("-", ""), isWindows ? ".bat" : ".sh");
            temp.setExecutable(true);
            try (FileWriter w = null;){
                w = new FileWriter(temp);
                if (isWindows) {
                    cmdarray = new String[]{"cmd", "/C", temp.getAbsolutePath()};
                } else {
                    File profile = new File("/etc/profile");
                    if (profile.exists()) {
                        try (FileReader fr = null;){
                            fr = new FileReader(profile);
                            int ch = 0;
                            while ((ch = fr.read()) != -1) {
                                w.write(ch);
                            }
                        }
                    }
                    w.write("\n");
                    cmdarray = new String[]{"/bin/sh", "-c", temp.getAbsolutePath()};
                }
                w.write(command);
            }
            p = Runtime.getRuntime().exec(cmdarray, null, new File(this.flinkHome.concat("/bin")));
            new ErrorStreamCatcher(p.getErrorStream()).start();
            FlinkJobsApplicationInfo appInfo = new FlinkJobsApplicationInfo();
            new InputStreamCatcher(p.getInputStream(), appInfo).start();
            p.waitFor();
            FlinkJobsApplicationInfo flinkJobsApplicationInfo = appInfo;
            return flinkJobsApplicationInfo;
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            if (is != null) {
                is.close();
            }
            if (r != null) {
                r.close();
            }
            if (p != null) {
                p.destroy();
            }
            if (temp != null) {
                temp.deleteOnExit();
            }
        }
    }

    protected void attachOptions(StringBuilder commandBuilder, Options options) {
        String targetKey = null;
        String target = null;
        if (options != null) {
            String prefix = options.getKeyPrefix();
            List<Option> option = options.getOption();
            if (option != null) {
                for (Option entry : option) {
                    String key = entry.getKey();
                    if (!key.startsWith("-")) {
                        key = prefix + key;
                    }
                    String value = entry.getValue();
                    if (TARGETS.contains(key)) {
                        if (value == null || value.isEmpty()) continue;
                        targetKey = key;
                        target = value;
                        continue;
                    }
                    commandBuilder.append(' ').append(key);
                    if (value == null || value.isEmpty()) continue;
                    commandBuilder.append(' ').append(value.replaceAll("\"", "\\\""));
                }
            }
        }
        if (target == null) {
            if (Action.RUN_APPLICATION.equals((Object)this.action)) {
                commandBuilder.append(' ').append("-t");
                commandBuilder.append(' ').append("yarn-application");
            }
        } else {
            commandBuilder.append(' ').append(targetKey);
            commandBuilder.append(' ').append(target.replaceAll("\"", "\\\""));
        }
    }

    protected static class FlinkJobsApplicationInfo
    implements FlinkJobsLauncher.FlinkJobsApplicationInfo {
        private String applicationId;
        private FlinkJobsLauncher.FlinkJobsApplicationInfo.State state;

        protected FlinkJobsApplicationInfo() {
        }

        @Override
        public String getApplicationId() {
            return this.applicationId;
        }

        public void setApplicationId(String applicationId) {
            this.applicationId = applicationId;
        }

        @Override
        public FlinkJobsLauncher.FlinkJobsApplicationInfo.State getState() {
            return this.state;
        }

        public void setState(FlinkJobsLauncher.FlinkJobsApplicationInfo.State state) {
            this.state = state;
        }
    }

    public static class ErrorStreamCatcher
    extends Thread {
        private InputStream is;

        public ErrorStreamCatcher(InputStream is) {
            this.is = is;
        }

        @Override
        public void run() {
            BufferedReader r = null;
            try {
                String line;
                r = new BufferedReader(new InputStreamReader(this.is));
                while ((line = r.readLine()) != null) {
                    log.info(line);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                if (r != null) {
                    try {
                        r.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static class InputStreamCatcher
    extends Thread {
        private InputStream is;
        private FlinkJobsApplicationInfo appInfo;

        public InputStreamCatcher(InputStream is, FlinkJobsApplicationInfo appInfo) {
            this.is = is;
            this.appInfo = appInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BufferedReader r = null;
            FlinkJobsLauncher.FlinkJobsApplicationInfo.State state = FlinkJobsLauncher.FlinkJobsApplicationInfo.State.SUBMITTED;
            String applicationId = null;
            try {
                String line;
                r = new BufferedReader(new InputStreamReader(this.is));
                while ((line = r.readLine()) != null) {
                    log.info(line);
                    int index = line.indexOf(CommandLineFlinkJobsLauncher.APPLICATION_ID_PREFFIX);
                    if (index >= 0) {
                        applicationId = line.substring(index + APPLICATION_ID_PREFFIX_LEN).trim();
                        continue;
                    }
                    index = line.indexOf(CommandLineFlinkJobsLauncher.CURRENT_STATE_PREFFIX);
                    if (index >= 0) {
                        try {
                            state = FlinkJobsLauncher.FlinkJobsApplicationInfo.State.valueOf(line.substring(index + CURRENT_STATE_PREFFIX_LEN).trim());
                        }
                        catch (Exception e) {
                            state = FlinkJobsLauncher.FlinkJobsApplicationInfo.State.SUBMITTED;
                        }
                        continue;
                    }
                    if (line.contains(CommandLineFlinkJobsLauncher.RUNNING_LOG) || line.contains(CommandLineFlinkJobsLauncher.YARN_RUNNING_LOG)) {
                        state = FlinkJobsLauncher.FlinkJobsApplicationInfo.State.RUNNING;
                        continue;
                    }
                    if (line.contains(CommandLineFlinkJobsLauncher.FINISHED_LOG) || line.contains(CommandLineFlinkJobsLauncher.YARN_FINISHED_LOG)) {
                        state = FlinkJobsLauncher.FlinkJobsApplicationInfo.State.FINISHED;
                        continue;
                    }
                    Matcher m = JOB_ID_PATTERN.matcher(line);
                    if (m.find()) {
                        String group = m.group();
                        applicationId = group.substring(JOB_ID_PREFIX_LEN, group.length() - JOB_ID_SUFFIX_LEN);
                        continue;
                    }
                    if (!FlinkJobsLauncher.FlinkJobsApplicationInfo.State.SUBMITTED.equals((Object)state) || (index = line.indexOf(CommandLineFlinkJobsLauncher.EXCEPTION)) < 0) continue;
                    throw new Exception(line);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                if (r != null) {
                    try {
                        r.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            this.appInfo.setApplicationId(applicationId);
            this.appInfo.setState(state);
        }
    }

    public static enum Action {
        RUN("run"),
        RUN_APPLICATION("run-application");

        private String name;

        private Action(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }
    }
}

