package org.apache.zeppelin.interpreter.launcher;

import com.google.common.base.CharMatcher;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.class */
public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreterLauncher.class);
    private static final Set<String> FLINK_EXECUTION_MODES = Sets.newHashSet(new String[]{"local", "remote", "yarn", "yarn-application", "kubernetes-application"});

    public FlinkInterpreterLauncher(ZeppelinConfiguration zeppelinConfiguration, RecoveryStorage recoveryStorage) {
        super(zeppelinConfiguration, recoveryStorage);
    }

    public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext interpreterLaunchContext) throws IOException {
        Map<String, String> buildEnvFromProperties = super.buildEnvFromProperties(interpreterLaunchContext);
        String flinkHome = getFlinkHome(interpreterLaunchContext);
        if (!buildEnvFromProperties.containsKey("FLINK_CONF_DIR")) {
            buildEnvFromProperties.put("FLINK_CONF_DIR", flinkHome + "/conf");
        }
        buildEnvFromProperties.put("FLINK_LIB_DIR", flinkHome + "/lib");
        buildEnvFromProperties.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
        normalizeConfiguration(interpreterLaunchContext);
        String property = interpreterLaunchContext.getProperties().getProperty("flink.execution.mode");
        if (!FLINK_EXECUTION_MODES.contains(property)) {
            throw new IOException("Not valid flink.execution.mode: " + property + ", valid modes ares: " + ((String) FLINK_EXECUTION_MODES.stream().collect(Collectors.joining(", "))));
        }
        if (isApplicationMode(property)) {
            updateEnvsForApplicationMode(property, buildEnvFromProperties, interpreterLaunchContext);
        }
        if (isK8sApplicationMode(property)) {
            String property2 = interpreterLaunchContext.getProperties().getProperty("flink.app.jar");
            if (StringUtils.isBlank(property2)) {
                throw new IOException("flink.app.jar is not specified for kubernetes-application mode");
            }
            buildEnvFromProperties.put("FLINK_APP_JAR", property2);
            LOGGER.info("K8s application's FLINK_APP_JAR : {}", property2);
            interpreterLaunchContext.getProperties().put("zeppelin.interpreter.forceShutdown", "false");
        } else {
            String chooseFlinkAppJar = chooseFlinkAppJar(flinkHome);
            LOGGER.info("Choose FLINK_APP_JAR for non k8s-application mode: {}", chooseFlinkAppJar);
            buildEnvFromProperties.put("FLINK_APP_JAR", chooseFlinkAppJar);
        }
        if ("yarn".equalsIgnoreCase(property) || "yarn-application".equalsIgnoreCase(property)) {
            boolean parseBoolean = Boolean.parseBoolean(interpreterLaunchContext.getProperties().getProperty("zeppelin.flink.run.asLoginUser", "true"));
            String userName = interpreterLaunchContext.getUserName();
            if (parseBoolean && !"anonymous".equals(userName)) {
                buildEnvFromProperties.put("HADOOP_USER_NAME", userName);
            }
        }
        return buildEnvFromProperties;
    }

    private void normalizeConfiguration(InterpreterLaunchContext interpreterLaunchContext) {
        Properties properties = interpreterLaunchContext.getProperties();
        setNewProperty(properties, "flink.jm.memory", "jobmanager.memory.process.size", true);
        setNewProperty(properties, "flink.tm.memory", "taskmanager.memory.process.size", true);
        setNewProperty(properties, "flink.tm.slot", "taskmanager.numberOfTaskSlots", false);
        setNewProperty(properties, "flink.yarn.appName", "yarn.application.name", false);
        setNewProperty(properties, "flink.yarn.queue", "yarn.application.queue", false);
    }

    private void setNewProperty(Properties properties, String str, String str2, boolean z) {
        String property = properties.getProperty(str);
        if (!StringUtils.isNotBlank(property) || properties.containsKey(str2)) {
            return;
        }
        if (z) {
            properties.put(str2, property + "mb");
        } else {
            properties.put(str2, property);
        }
    }

    private String chooseFlinkAppJar(String str) throws IOException {
        List list = (List) Arrays.stream(new File(str, "lib").listFiles(file -> {
            return file.getName().contains("flink-dist");
        })).collect(Collectors.toList());
        if (list.size() > 1) {
            throw new IOException("More than 1 flink-dist files: " + ((String) list.stream().map(file2 -> {
                return file2.getAbsolutePath();
            }).collect(Collectors.joining(","))));
        }
        if (list.isEmpty()) {
            throw new IOException(String.format("No flink-dist jar found under {0}", str + "/lib"));
        }
        String str2 = ((File) list.get(0)).getName().contains("2.11") ? "2.11" : "2.12";
        List list2 = (List) Arrays.stream(new File(ZeppelinConfiguration.create().getInterpreterDir(), "flink").listFiles(file3 -> {
            return file3.getName().endsWith(".jar");
        })).filter(file4 -> {
            return file4.getName().contains(str2);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            throw new IOException("No flink scala jar file is found");
        }
        if (list2.size() > 1) {
            throw new IOException("More than 1 flink scala jar files: " + ((String) list2.stream().map(file5 -> {
                return file5.getAbsolutePath();
            }).collect(Collectors.joining(","))));
        }
        return ((File) list2.get(0)).getAbsolutePath();
    }

    private boolean isApplicationMode(String str) {
        return isYarnApplicationMode(str) || isK8sApplicationMode(str);
    }

    private boolean isYarnApplicationMode(String str) {
        return "yarn-application".equals(str);
    }

    private boolean isK8sApplicationMode(String str) {
        return "kubernetes-application".equals(str);
    }

    private String getFlinkHome(InterpreterLaunchContext interpreterLaunchContext) throws IOException {
        String property = interpreterLaunchContext.getProperties().getProperty("FLINK_HOME");
        if (StringUtils.isBlank(property)) {
            property = System.getenv("FLINK_HOME");
        }
        if (StringUtils.isBlank(property)) {
            throw new IOException("FLINK_HOME is not specified");
        }
        File file = new File(property);
        if (!file.exists()) {
            throw new IOException(String.format("FLINK_HOME '%s' doesn't exist", property));
        }
        if (file.isDirectory()) {
            return property;
        }
        throw new IOException(String.format("FLINK_HOME '%s' is a file, but should be directory", property));
    }

    private void updateEnvsForApplicationMode(String str, Map<String, String> map, InterpreterLaunchContext interpreterLaunchContext) throws IOException {
        map.put("ZEPPELIN_FLINK_APPLICATION_MODE", str);
        StringJoiner stringJoiner = new StringJoiner("|");
        List<String> yarnShipFiles = getYarnShipFiles(interpreterLaunchContext);
        if (!yarnShipFiles.isEmpty()) {
            stringJoiner.add("-D");
            stringJoiner.add("yarn.ship-files=" + ((String) yarnShipFiles.stream().collect(Collectors.joining(";"))));
        }
        String property = interpreterLaunchContext.getProperties().getProperty("flink.yarn.appName");
        if (StringUtils.isNotBlank(property)) {
            stringJoiner.add("-D");
            stringJoiner.add("yarn.application.name=" + property);
        }
        for (Map.Entry entry : interpreterLaunchContext.getProperties().entrySet()) {
            String obj = entry.getKey().toString();
            String obj2 = entry.getValue().toString();
            if (!obj.equalsIgnoreCase("yarn.ship-files") && !obj.equalsIgnoreCase("flink.yarn.appName")) {
                if (CharMatcher.whitespace().matchesAnyOf(obj2)) {
                    LOGGER.warn("flink configuration key {} is skipped because it contains white space", obj);
                } else {
                    stringJoiner.add("-D");
                    stringJoiner.add(obj + "=" + obj2);
                }
            }
        }
        map.put("ZEPPELIN_FLINK_APPLICATION_MODE_CONF", stringJoiner.toString());
    }

    private List<String> getYarnShipFiles(InterpreterLaunchContext interpreterLaunchContext) throws IOException {
        ArrayList arrayList = new ArrayList();
        String property = interpreterLaunchContext.getProperties().getProperty("HIVE_CONF_DIR");
        if (StringUtils.isNotBlank(property) && Boolean.parseBoolean(interpreterLaunchContext.getProperties().getProperty("zeppelin.flink.enableHive", "false"))) {
            File file = new File(property, "hive-site.xml");
            if (file.isFile() && file.exists()) {
                arrayList.add(file.getAbsolutePath());
            } else {
                LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", file);
            }
        }
        if (interpreterLaunchContext.getProperties().containsKey("yarn.ship-files")) {
            arrayList.add(interpreterLaunchContext.getProperties().getProperty("yarn.ship-files"));
        }
        return arrayList;
    }
}
