package org.apache.kylin.engine.spark.job;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.ParameterFilter;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.tool.shaded.org.apache.commons.lang.StringUtils;
import org.apache.kylin.tool.shaded.org.apache.http.cookie.ClientCookie;
import org.apache.log4j.spi.Configurator;
import org.apache.spark.deploy.SparkApplicationClient;
import org.apache.spark.utils.SparkVersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.support.LocalizedResourceHelper;
import org.springframework.util.AntPathMatcher;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkExecutable.class */
public class NSparkExecutable extends AbstractExecutable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) NSparkExecutable.class);
    protected static final String SPARK_MASTER = "spark.master";
    protected static final String DEPLOY_MODE = "spark.submit.deployMode";
    private static final String APP_JAR_NAME = "__app__.jar";
    private volatile boolean isYarnCluster = false;
    private volatile boolean isStandaloneCluster = false;

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSparkSubmitClassName(String str) {
        setParam(MetadataConstants.P_CLASS_NAME, str);
    }

    public String getSparkSubmitClassName() {
        return getParam(MetadataConstants.P_CLASS_NAME);
    }

    public String getJars() {
        return getParam(MetadataConstants.P_JARS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDistMetaUrl(StorageURL storageURL) {
        setParam(MetadataConstants.P_DIST_META_URL, storageURL.copy(Maps.newHashMap(storageURL.getAllParameters())).toString());
    }

    public String getDistMetaUrl() {
        return getParam(MetadataConstants.P_DIST_META_URL);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        KylinConfig config = executableContext.getConfig();
        setLogPath(getSparkDriverLogHdfsPath(config));
        if (getCubeName() != null) {
            CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(getCubeName());
            KylinConfig config2 = cube.getConfig();
            HashMap hashMap = new HashMap();
            LinkedHashMap<String, String> overrideKylinProps = cube.getDescriptor().getOverrideKylinProps();
            hashMap.putAll(cube.getProjectInstance().getOverrideKylinProps());
            hashMap.putAll(overrideKylinProps);
            for (Map.Entry entry : hashMap.entrySet()) {
                ParameterFilter.checkSparkConf((String) entry.getKey());
                ParameterFilter.checkSparkConf((String) entry.getValue());
            }
            config = wrapConfig(config2);
        }
        if (StringUtils.isEmpty(KylinConfig.getSparkHome()) && !config.isUTEnv()) {
            throw new RuntimeException("Missing spark home");
        }
        String kylinParquetJobJarPath = config.getKylinParquetJobJarPath();
        if (!config.isUTEnv() && StringUtils.isEmpty(kylinParquetJobJarPath) && !config.isUTEnv()) {
            throw new RuntimeException("Missing kylin parquet job jar");
        }
        String property = System.getProperty("kylin.hadoop.conf.dir");
        logger.info("write hadoop conf is {} ", config.getBuildConf());
        if (!config.getBuildConf().isEmpty()) {
            logger.info("write hadoop conf is {} ", config.getBuildConf());
            property = config.getBuildConf();
        }
        if (StringUtils.isEmpty(property) && !config.isUTEnv() && !config.isZKLocal()) {
            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
        }
        if (!new File(property, "hive-site.xml").exists() && !config.isUTEnv() && !config.isZKLocal()) {
            throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + property + ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml");
        }
        String jars = getJars();
        if (StringUtils.isEmpty(jars)) {
            jars = kylinParquetJobJarPath;
        }
        deleteJobTmpDirectoryOnExists();
        onExecuteStart(executableContext);
        try {
            attachMetadataAndKylinProps(config);
            String dumpArgs = dumpArgs();
            if (config.isUTEnv() || config.isLocalEnv() || config.isZKLocal()) {
                return runLocalMode(dumpArgs, config);
            }
            logger.info("Task id: {}", getId());
            if ("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName(SPARK_MASTER))) {
                logger.info("Try to kill orphan application on yarn.");
                killOrphanApplicationIfExists(config, getId());
            }
            return runSparkSubmit(config, property, jars, kylinParquetJobJarPath, "-className " + getSparkSubmitClassName() + org.apache.kylin.tool.shaded.org.apache.commons.lang3.StringUtils.SPACE + dumpArgs, getParent().getId());
        } catch (IOException e) {
            throw new ExecuteException("meta dump failed", e);
        }
    }

    void attachMetadataAndKylinProps(KylinConfig kylinConfig) throws IOException {
        MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(getMetadataDumpList(kylinConfig), kylinConfig, getDistMetaUrl());
    }

    String dumpArgs() throws ExecuteException {
        File file = null;
        try {
            Path path = new Path(getConfig().getJobTmpDir(getParams().get("project")), getId() + LocalizedResourceHelper.DEFAULT_SEPARATOR + MetadataConstants.P_JOB_ID);
            BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration()).create(path));
            Throwable th = null;
            try {
                try {
                    bufferedOutputStream.write(JsonUtil.writeValueAsBytes(getParams()));
                    if (bufferedOutputStream != null) {
                        if (0 != 0) {
                            try {
                                bufferedOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            bufferedOutputStream.close();
                        }
                    }
                    logger.info("Spark job args json is : {}.", JsonUtil.writeValueAsString(getParams()));
                    return path.toUri().toString();
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            if (0 != 0 && file.exists()) {
                try {
                    Files.delete(file.toPath());
                } catch (IOException e2) {
                    throw new ExecuteException("Write cuboidLayoutIds failed: Error for delete file " + file.getPath(), e2);
                }
            }
            throw new ExecuteException("Write cuboidLayoutIds failed: ", e);
        }
    }

    public String getSparkDriverLogHdfsPath(KylinConfig kylinConfig) {
        return String.format(Locale.ROOT, "%s.%s.log", kylinConfig.getJobOutputStorePath(getParam("project"), getId()), Long.valueOf(System.currentTimeMillis()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KylinConfig wrapConfig(ExecutableContext executableContext) {
        return wrapConfig(executableContext.getConfig());
    }

    protected KylinConfig wrapConfig(KylinConfig kylinConfig) {
        String param = getParam("project");
        Preconditions.checkState(StringUtils.isNotBlank(param), "job " + getId() + " project info is empty");
        HashMap hashMap = new HashMap();
        String parentId = getParentId();
        hashMap.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
        hashMap.put("job.project", param);
        if (StringUtils.isNotBlank(parentId)) {
            hashMap.put("job.stepId", getId());
        }
        hashMap.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
        hashMap.put("hdfs.working.dir", KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
        hashMap.put("spark.driver.log4j.appender.hdfs.File", Objects.isNull(getLogPath()) ? Configurator.NULL : getLogPath());
        return KylinConfigExt.createInstance(kylinConfig, hashMap);
    }

    /* JADX WARN: Finally extract failed */
    private void killOrphanApplicationIfExists(KylinConfig kylinConfig, String str) {
        PatternedLogger patternedLogger = new PatternedLogger(logger);
        try {
            YarnClient createYarnClient = YarnClient.createYarnClient();
            Throwable th = null;
            try {
                YarnConfiguration yarnConfiguration = new YarnConfiguration();
                yarnConfiguration.set("yarn.timeline-service.enabled", "false");
                createYarnClient.init(yarnConfiguration);
                createYarnClient.start();
                List<ApplicationReport> applications = createYarnClient.getApplications(Sets.newHashSet("SPARK"), EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING));
                if (CollectionUtils.isEmpty(applications)) {
                    if (createYarnClient != null) {
                        if (0 == 0) {
                            createYarnClient.close();
                            return;
                        }
                        try {
                            createYarnClient.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                for (ApplicationReport applicationReport : applications) {
                    if (applicationReport.getName().equalsIgnoreCase("job_step_" + getId())) {
                        kylinConfig.getCliCommandExecutor().execute("yarn application -kill " + applicationReport.getApplicationId().toString(), patternedLogger, str);
                    }
                }
                if (createYarnClient != null) {
                    if (0 != 0) {
                        try {
                            createYarnClient.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createYarnClient.close();
                    }
                }
            } catch (Throwable th4) {
                if (createYarnClient != null) {
                    if (0 != 0) {
                        try {
                            createYarnClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createYarnClient.close();
                    }
                }
                throw th4;
            }
        } catch (YarnException | IOException e) {
            logger.error("get yarn application failed");
        }
    }

    private ExecuteResult runSparkSubmit(KylinConfig kylinConfig, String str, String str2, String str3, String str4, String str5) {
        PatternedLogger patternedLogger = kylinConfig.isJobLogPrintEnabled() ? new PatternedLogger(logger) : new PatternedLogger(null);
        try {
            new CliCommandExecutor().execute(generateSparkCmd(kylinConfig, str, str2, str3, str4), patternedLogger, str5);
            if (this.isStandaloneCluster) {
                SparkApplicationClient.awaitAndCheckAppState(SparkApplicationClient.STANDALONE_CLUSTER(), str5);
            }
            updateMetaAfterOperation(kylinConfig);
            getManager().addJobInfo(getId(), getJobMetricsInfo(kylinConfig));
            Map<String, String> makeExtraInfo = makeExtraInfo(patternedLogger.getInfo());
            ExecuteResult createSucceed = ExecuteResult.createSucceed(patternedLogger.getBufferedLog());
            createSucceed.getExtraInfo().putAll(makeExtraInfo);
            return createSucceed;
        } catch (Exception e) {
            return ExecuteResult.createError(e);
        }
    }

    protected void updateMetaAfterOperation(KylinConfig kylinConfig) throws IOException {
    }

    protected Map<String, String> getJobMetricsInfo(KylinConfig kylinConfig) {
        return Maps.newHashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getSparkConfigOverride(KylinConfig kylinConfig) {
        Map<String, String> sparkConfigOverride = kylinConfig.getSparkConfigOverride();
        if ("yarn".equals(sparkConfigOverride.get(SPARK_MASTER)) && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
            this.isYarnCluster = true;
        }
        if (sparkConfigOverride.get(SPARK_MASTER).toLowerCase(Locale.ROOT).startsWith("spark") && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
            this.isStandaloneCluster = true;
        }
        if (!sparkConfigOverride.containsKey(SparkConfHelper.DRIVER_MEMORY)) {
            sparkConfigOverride.put(SparkConfHelper.DRIVER_MEMORY, computeStepDriverMemory() + "m");
        }
        if (UserGroupInformation.isSecurityEnabled()) {
            sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
        }
        if (SparkVersionUtils.isLessThanSparkVersion("2.4", true)) {
            sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
        }
        replaceSparkNodeJavaOpsConfIfNeeded(kylinConfig, sparkConfigOverride);
        return sparkConfigOverride;
    }

    private void replaceSparkNodeJavaOpsConfIfNeeded(KylinConfig kylinConfig, Map<String, String> map) {
        StringBuilder sb = new StringBuilder();
        if (map.containsKey("spark.driver.extraJavaOptions")) {
            sb.append(map.get("spark.driver.extraJavaOptions"));
        }
        String serverRestAddress = kylinConfig.getServerRestAddress();
        String hdfsWorkingDirectory = kylinConfig.getHdfsWorkingDirectory();
        String str = null;
        if (kylinConfig instanceof KylinConfigExt) {
            Map<String, String> extendedOverrides = ((KylinConfigExt) kylinConfig).getExtendedOverrides();
            if (Objects.nonNull(extendedOverrides)) {
                str = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
            }
        }
        wrapLog4jConf(sb, kylinConfig);
        sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", kylinConfig.isKerberosEnabled()));
        if (kylinConfig.isKerberosEnabled().booleanValue()) {
            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", kylinConfig.getKerberosPrincipal()));
            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", kylinConfig.getKerberosKeytabPath()));
            if (kylinConfig.getPlatformZKEnable().booleanValue()) {
                sb.append(String.format(Locale.ROOT, " -Djava.security.auth.login.config=%s", kylinConfig.getKerberosJaasConfPath()));
                sb.append(String.format(Locale.ROOT, " -Djava.security.krb5.conf=%s", kylinConfig.getKerberosKrb5ConfPath()));
            }
        }
        sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDirectory));
        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", str));
        sb.append(String.format(Locale.ROOT, " -Dlog4j.debug=%s ", "true"));
        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.address=%s ", serverRestAddress));
        sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", kylinConfig.getKylinLogDir() + "/spark"));
        map.put("spark.driver.extraJavaOptions", sb.toString());
    }

    protected String generateSparkCmd(KylinConfig kylinConfig, String str, String str2, String str3, String str4) {
        StringBuilder sb = new StringBuilder();
        String str5 = KylinConfig.getSparkHome() + "/bin/spark-submit";
        sb.append("export HADOOP_CONF_DIR=%s && %s --class org.apache.kylin.engine.spark.application.SparkEntry ");
        Map<String, String> sparkConfigOverride = getSparkConfigOverride(kylinConfig);
        for (Map.Entry<String, String> entry : sparkConfigOverride.entrySet()) {
            appendSparkConf(sb, entry.getKey(), entry.getValue());
        }
        if (!isLocalMaster(sparkConfigOverride)) {
            appendSparkConf(sb, "spark.executor.extraClassPath", Paths.get(str3, new String[0]).getFileName().toString());
        }
        String orDefault = sparkConfigOverride.getOrDefault("spark.driver.extraClassPath", "");
        String format = this.isYarnCluster ? String.format(Locale.ROOT, "%s:%s", APP_JAR_NAME, Paths.get(str3, new String[0]).getFileName().toString()) : str3;
        appendSparkConf(sb, "spark.driver.extraClassPath", orDefault.equals("") ? format : String.format(Locale.ROOT, "%s:%s", format, orDefault));
        String sparkUploadFiles = kylinConfig.sparkUploadFiles(isLocalMaster(sparkConfigOverride), this.isYarnCluster);
        if (StringUtils.isNotBlank(sparkUploadFiles)) {
            sb.append("--files ").append(sparkUploadFiles).append(org.apache.kylin.tool.shaded.org.apache.commons.lang3.StringUtils.SPACE);
        }
        if (kylinConfig.isKerberosEnabled().booleanValue()) {
            sb.append("--principal ").append(kylinConfig.getKerberosPrincipal()).append(org.apache.kylin.tool.shaded.org.apache.commons.lang3.StringUtils.SPACE);
            sb.append("--keytab ").append(kylinConfig.getKerberosKeytabPath()).append(org.apache.kylin.tool.shaded.org.apache.commons.lang3.StringUtils.SPACE);
        }
        if (this.isYarnCluster) {
            String format2 = String.format(Locale.ROOT, "%s#%s", str3, Paths.get(str3, new String[0]).getFileName().toString());
            str2 = (StringUtils.isBlank(str2) || str2.equals(str3)) ? format2 : str2.contains(str3) ? str2.replace(str3, format2) : String.format(Locale.ROOT, "%s,%s", str2, format2);
        }
        sb.append("--name job_step_%s ");
        sb.append("--jars %s %s %s");
        String format3 = String.format(Locale.ROOT, sb.toString(), str, str5, getId(), str2, str3, str4);
        logger.info("spark submit cmd: {}", format3);
        return format3;
    }

    private void wrapLog4jConf(StringBuilder sb, KylinConfig kylinConfig) {
        if (kylinConfig.isDefaultLogSparkDriverProperties()) {
            logger.info("Current using default log4j properties for spark driver in using `ConsoleAppender`.Please modify `kylin.spark.driver.log4j.properties` to be `spark-driver-log4j.properties`for uploading log file to hdfs.");
        }
        if (kylinConfig.isDefaultLogSparkExecutorProperties()) {
            logger.info("Current using default log4j properties for spark executor in using `ConsoleAppender`.Please modify `kylin.spark.executor.log4j.properties` to be `spark-executor-log4j.properties`for uploading log file to hdfs.");
        }
        String logSparkDriverPropertiesFile = kylinConfig.getLogSparkDriverPropertiesFile();
        String path = Paths.get(logSparkDriverPropertiesFile, new String[0]).getFileName().toString();
        if (this.isYarnCluster) {
            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", path));
        } else {
            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=file:%s ", logSparkDriverPropertiesFile));
        }
    }

    protected void appendSparkConf(StringBuilder sb, String str, String str2) {
        sb.append(" --conf '").append(str).append("=").append(str2.trim()).append("' ");
    }

    private ExecuteResult runLocalMode(String str, KylinConfig kylinConfig) {
        try {
            ClassUtil.forName(getSparkSubmitClassName(), Object.class).getMethod("main", String[].class).invoke(null, new String[]{str});
            updateMetaAfterOperation(kylinConfig);
            getManager().addJobInfo(getId(), getJobMetricsInfo(kylinConfig));
            return ExecuteResult.createSucceed();
        } catch (Exception e) {
            return ExecuteResult.createError(e);
        }
    }

    protected Set<String> getMetadataDumpList(KylinConfig kylinConfig) {
        return Collections.emptySet();
    }

    private void deleteJobTmpDirectoryOnExists() {
        String parameter = StorageURL.valueOf(getDistMetaUrl()).getParameter(ClientCookie.PATH_ATTR);
        String[] split = parameter.split(AntPathMatcher.DEFAULT_PATH_SEPARATOR);
        String substring = parameter.substring(0, (parameter.length() - 1) - split[split.length - 1].length());
        try {
            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(substring));
        } catch (Exception e) {
            logger.error("delete job tmp in path {} failed.", substring, e);
        }
    }

    protected boolean isLocalMaster(Map<String, String> map) {
        String orDefault = map.getOrDefault(SPARK_MASTER, "yarn");
        return orDefault.equalsIgnoreCase("local") || orDefault.toLowerCase(Locale.ROOT).startsWith("local[");
    }

    public boolean needMergeMetadata() {
        return false;
    }
}
