package org.apache.kylin.engine.spark.job;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.StringHelper;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.merger.MetadataMerger;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ChainedStageExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.execution.StageBase;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.view.LogicalView;
import org.apache.kylin.metadata.view.LogicalViewManager;
import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkExecutable.class */
public class NSparkExecutable extends AbstractExecutable implements ChainedStageExecutable {
    private static final Logger logger = LoggerFactory.getLogger(NSparkExecutable.class);
    private static final String AM_EXTRA_JAVA_OPTIONS = "spark.yarn.am.extraJavaOptions";
    private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
    private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
    private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/";
    private static final String APP_JAR_NAME = "__app__.jar";
    private static final String SPARK_JARS_1 = "spark.jars";
    private static final String SPARK_JARS_2 = "spark.yarn.dist.jars";
    private static final String SPARK_FILES_1 = "spark.files";
    private static final String SPARK_FILES_2 = "spark.yarn.dist.files";
    private static final String COMMA = ",";
    private static final String COLON = ":";
    private static final String EMPTY = "";
    private static final String SPACE = " ";
    private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
    private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
    protected static final String SPARK_MASTER = "spark.master";
    protected static final String DEPLOY_MODE = "spark.submit.deployMode";
    protected static final String CLUSTER_MODE = "cluster";
    protected static final String SPARK_PLUGINS = "spark.plugins";
    protected ISparkJobHandler sparkJobHandler;
    private final transient List<StageBase> stages;
    private final Map<String, List<StageBase>> stagesMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkExecutable$ConfMap.class */
    public interface ConfMap {
        String get(String str);

        void set(String str, String str2);
    }

    public NSparkExecutable() {
        this.stages = Lists.newCopyOnWriteArrayList();
        this.stagesMap = Maps.newConcurrentMap();
        initHandler();
    }

    public NSparkExecutable(Object obj) {
        super(obj);
        this.stages = Lists.newCopyOnWriteArrayList();
        this.stagesMap = Maps.newConcurrentMap();
        initHandler();
    }

    public String getDataflowId() {
        return getParam("dataflowId");
    }

    protected void initHandler() {
        this.sparkJobHandler = (ISparkJobHandler) ClassUtil.newInstance(KylinConfig.getInstanceFromEnv().getSparkBuildJobHandlerClassName());
    }

    public void killApplicationIfExistsOrUpdateStepStatus() {
        Optional.ofNullable(NDefaultScheduler.getInstance(getProject()).getContext()).ifPresent(executableContext -> {
            Optional.ofNullable(executableContext.getRunningJobThread(this)).ifPresent(thread -> {
                thread.interrupt();
                executableContext.removeRunningJob(this);
            });
        });
        killOrphanApplicationIfExists(getId());
    }

    public Set<String> getSegmentIds() {
        return Sets.newHashSet(StringUtils.split(getParam("segmentIds"), COMMA));
    }

    public Set<Long> getCuboidLayoutIds() {
        return NSparkCubingUtil.str2Longs(getParam("layoutIds"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSparkSubmitClassName(String str) {
        if (KylinConfig.getInstanceFromEnv().getSparkEngineBuildStepsToSkip().contains(getClass().getName())) {
            str = EmptyPlaceholderJob.class.getName();
        }
        setParam("className", str);
    }

    public String getSparkSubmitClassName() {
        return getParam("className");
    }

    public String getJars() {
        return getParam("jars");
    }

    private boolean isLocalFs() {
        return HadoopUtil.getWorkingFileSystem().getUri().toString().startsWith("file:");
    }

    private String getDistMetaFs() {
        String uri = HadoopUtil.getWorkingFileSystem().getUri().toString();
        String engineWriteFs = KylinConfig.getInstanceFromEnv().getEngineWriteFs();
        String str = StringUtils.isBlank(engineWriteFs) ? uri : engineWriteFs;
        return str.startsWith("maprfs://") ? "maprfs://" : str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDistMetaUrl(StorageURL storageURL) {
        String distMetaFs = getDistMetaFs();
        HashMap newHashMap = Maps.newHashMap(storageURL.getAllParameters());
        if (!isLocalFs()) {
            newHashMap.put("path", distMetaFs + storageURL.getParameter("path"));
        }
        StorageURL copy = storageURL.copy(newHashMap);
        setParam("distMetaUrl", copy.toString());
        setParam("outputMetaUrl", copy + "_output");
    }

    public String getDistMetaUrl() {
        return getParam("distMetaUrl");
    }

    public void waiteForResourceStart(ExecutableContext executableContext) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager executableManager = getExecutableManager(getProject());
            executableManager.updateStageStatus(getId() + "_00", (String) null, ExecutableState.RUNNING, (Map) null, (String) null);
            executableManager.saveUpdatedJob();
            return 0;
        }, this.project, 3, executableContext.getEpochId(), getTempLockName());
    }

    public ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        waiteForResourceStart(executableContext);
        setLogPath(getSparkDriverLogHdfsPath(executableContext.getConfig()));
        KylinConfig config = getConfig();
        String id = getId();
        if (!config.isDevOrUT()) {
            setDistMetaUrl(config.getJobTmpMetaStoreUrl(this.project, id));
        }
        if (StringUtils.isEmpty(KylinConfigBase.getSparkHome()) && !config.isUTEnv()) {
            throw new RuntimeException("Missing spark home");
        }
        String kylinJobJarPath = config.getKylinJobJarPath();
        if (StringUtils.isEmpty(kylinJobJarPath) && !config.isUTEnv()) {
            throw new RuntimeException("Missing kylin job jar");
        }
        if (!config.isDevOrUT()) {
            this.sparkJobHandler.checkApplicationJar(config);
        }
        String hadoopConfDir = HadoopUtil.getHadoopConfDir();
        if (!new File(hadoopConfDir, "hive-site.xml").exists() && !config.isUTEnv()) {
            throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + hadoopConfDir + ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml");
        }
        deleteSnapshotDirectoryOnExists();
        deleteJobTmpDirectoryOnExists();
        onExecuteStart();
        try {
            attachMetadataAndKylinProps(config, isResumable());
            if (!isResumable()) {
                EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                    NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).setJobResumable(getId());
                    return 0;
                }, this.project, 3, executableContext.getEpochId(), getTempLockName());
            }
            this.sparkJobHandler.prepareEnviroment(this.project, id, getParams());
            String createArgsFileOnHDFS = createArgsFileOnHDFS(config, id);
            checkParentJobStatus();
            return config.isUTEnv() ? runLocalMode(createArgsFileOnHDFS) : runSparkSubmit(hadoopConfDir, kylinJobJarPath, "-className " + getSparkSubmitClassName() + SPACE + createArgsFileOnHDFS);
        } catch (IOException e) {
            throw new ExecuteException("meta dump failed", e);
        }
    }

    protected void onExecuteStart() throws JobStoppedException {
        wrapWithCheckQuit(() -> {
            Map<String, String> sparkConf = getSparkConf();
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("job_params", JsonUtil.writeValueAsString(sparkConf));
            updateJobOutput(this.project, getId(), ExecutableState.RUNNING, newHashMap, null, null);
        });
    }

    protected String createArgsFileOnHDFS(KylinConfig kylinConfig, String str) throws ExecuteException {
        return this.sparkJobHandler.createArgsFileOnRemoteFileSystem(kylinConfig, getProject(), str, getParams());
    }

    @VisibleForTesting
    Map<String, String> filterEmptySegments(Map<String, String> map) {
        HashMap newHashMap = Maps.newHashMap(map);
        String str = (String) newHashMap.get("segmentIds");
        NDataflow dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(getDataflowId());
        if (Objects.isNull(dataflow) || StringUtils.isBlank(str)) {
            return newHashMap;
        }
        newHashMap.put("segmentIds", (String) Stream.of((Object[]) StringUtils.split(str, COMMA)).filter(str2 -> {
            return Objects.nonNull(dataflow.getSegment(str2));
        }).collect(Collectors.joining(COMMA)));
        return newHashMap;
    }

    public String getSparkDriverLogHdfsPath(KylinConfig kylinConfig) {
        return String.format(Locale.ROOT, "%s.%s.log", kylinConfig.getJobTmpOutputStorePath(getProject(), getId()), Long.valueOf(System.currentTimeMillis()));
    }

    private Boolean checkHadoopWorkingDir() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        return Boolean.valueOf(StringUtils.equals(instanceFromEnv.getHdfsWorkingDirectory(), instanceFromEnv.getHdfsWorkingDirectoryFromProperties(KylinConfig.buildSiteProperties())));
    }

    protected KylinConfig getConfig() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (!instanceFromEnv.isDevOrUT() && !checkHadoopWorkingDir().booleanValue()) {
            KylinConfig.getInstanceFromEnv().reloadKylinConfigPropertiesFromSiteProperties();
        }
        String project = getProject();
        Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
        KylinConfigExt kylinConfigExt = getKylinConfigExt(instanceFromEnv, project);
        HashMap newHashMap = Maps.newHashMap();
        String parentId = getParentId();
        newHashMap.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
        newHashMap.put("job.project", project);
        if (StringUtils.isNotBlank(instanceFromEnv.getMountSparkLogDir())) {
            newHashMap.put("job.mountDir", instanceFromEnv.getMountSparkLogDir());
        }
        if (StringUtils.isNotBlank(parentId)) {
            newHashMap.put("job.stepId", getId());
        }
        newHashMap.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
        newHashMap.put("spark.driver.log4j.appender.hdfs.File", Objects.isNull(getLogPath()) ? "null" : getLogPath());
        newHashMap.putAll(kylinConfigExt.getExtendedOverrides());
        if (getParent() != null) {
            String sparkYarnQueue = getParent().getSparkYarnQueue();
            if (!StringUtils.isEmpty(sparkYarnQueue)) {
                newHashMap.put("kylin.engine.spark-conf.spark.yarn.queue", sparkYarnQueue);
            }
        }
        String kubernetesUploadPath = kylinConfigExt.getKubernetesUploadPath();
        if (StringUtils.isNotEmpty(kubernetesUploadPath)) {
            newHashMap.put(kylinConfigExt.getKubernetesUploadPathKey(), kubernetesUploadPath + "/" + ((String) StringUtils.defaultIfBlank(parentId, getId())));
        }
        return KylinConfigExt.createInstance(kylinConfigExt, newHashMap);
    }

    public KylinConfigExt getKylinConfigExt(KylinConfig kylinConfig, String str) {
        NDataflow dataflow;
        String param = getParam("dataflowId");
        return (!StringUtils.isNotBlank(param) || null == (dataflow = NDataflowManager.getInstance(kylinConfig, str).getDataflow(param))) ? NProjectManager.getInstance(kylinConfig).getProject(str).getConfig() : dataflow.getConfig();
    }

    public SparkAppDescription getSparkAppDesc() {
        SparkAppDescription sparkAppDescription = new SparkAppDescription();
        KylinConfig config = getConfig();
        sparkAppDescription.setJobNamePrefix(getJobNamePrefix());
        sparkAppDescription.setProject(getProject());
        sparkAppDescription.setJobId(getId());
        sparkAppDescription.setStepId(getStepId());
        sparkAppDescription.setSparkSubmitClassName(getSparkSubmitClassName());
        Map<String, String> sparkConf = getSparkConf(config);
        sparkAppDescription.setSparkConf(sparkConf);
        sparkAppDescription.setComma(COMMA);
        sparkAppDescription.setSparkJars(getSparkJars(config, sparkConf));
        sparkAppDescription.setSparkFiles(getSparkFiles(config, sparkConf));
        return sparkAppDescription;
    }

    protected ExecuteResult runSparkSubmit(String str, String str2, String str3) throws JobStoppedException {
        this.sparkJobHandler.killOrphanApplicationIfExists(this.project, getId(), getConfig(), true, getSparkConf());
        try {
            SparkAppDescription sparkAppDesc = getSparkAppDesc();
            sparkAppDesc.setHadoopConfDir(str);
            sparkAppDesc.setKylinJobJar(str2);
            sparkAppDesc.setAppArgs(str3);
            Map runSparkSubmit = this.sparkJobHandler.runSparkSubmit(this.sparkJobHandler.generateSparkCmd(KylinConfig.getInstanceFromEnv(), sparkAppDesc), getParentId());
            String str4 = (String) runSparkSubmit.get("output");
            if (StringUtils.isNotEmpty((CharSequence) runSparkSubmit.get("process_id"))) {
                try {
                    runSparkSubmit.remove("output");
                    EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                        NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).updateJobOutput(getParentId(), getStatus(), runSparkSubmit, (Set) null, (String) null);
                        return null;
                    }, this.project, 3, getEpochId(), getTempLockName());
                } catch (Exception e) {
                    logger.warn("failed to record process id.");
                }
            }
            return ExecuteResult.createSucceed(str4);
        } catch (Exception e2) {
            checkNeedQuit(true);
            logger.warn("failed to execute spark submit command.");
            wrapWithExecuteExceptionUpdateJobError(e2);
            return ExecuteResult.createError(e2);
        }
    }

    public void killOrphanApplicationIfExists(String str) {
        this.sparkJobHandler.killOrphanApplicationIfExists(this.project, str, getConfig(), false, getSparkConf());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getSparkConfigOverride(KylinConfig kylinConfig) {
        Map<String, String> sparkConfigOverride = kylinConfig.getSparkConfigOverride();
        if (!sparkConfigOverride.containsKey(SparkConfHelper.DRIVER_MEMORY)) {
            sparkConfigOverride.put(SparkConfHelper.DRIVER_MEMORY, computeStepDriverMemory() + "m");
        }
        if (UserGroupInformation.isSecurityEnabled()) {
            sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
        }
        return sparkConfigOverride;
    }

    private ExecuteResult runLocalMode(String str) {
        try {
            Class forName = ClassUtil.forName(getSparkSubmitClassName(), Object.class);
            forName.getMethod("main", String[].class).invoke(forName.newInstance(), new String[]{str});
            return ExecuteResult.createSucceed();
        } catch (Exception e) {
            return ExecuteResult.createError(e);
        }
    }

    protected Set<String> getMetadataDumpList(KylinConfig kylinConfig) {
        return Collections.emptySet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> getLogicalViewMetaDumpList(KylinConfig kylinConfig) {
        LogicalView findLogicalViewInProject;
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        if (!kylinConfig.isDDLLogicalViewEnabled()) {
            return linkedHashSet;
        }
        String param = getParam("table");
        String dataflowId = getDataflowId();
        LogicalViewManager logicalViewManager = LogicalViewManager.getInstance(kylinConfig);
        if (StringUtils.isNotBlank(dataflowId)) {
            linkedHashSet.addAll((Set) logicalViewManager.findLogicalViewsInModel(getProject(), getDataflowId()).stream().map((v0) -> {
                return v0.getResourcePath();
            }).collect(Collectors.toSet()));
        }
        if (StringUtils.isNotBlank(param) && (findLogicalViewInProject = logicalViewManager.findLogicalViewInProject(getProject(), param)) != null) {
            linkedHashSet.add(findLogicalViewInProject.getResourcePath());
        }
        return linkedHashSet;
    }

    void attachMetadataAndKylinProps(KylinConfig kylinConfig) throws IOException {
        attachMetadataAndKylinProps(kylinConfig, false);
    }

    protected void attachMetadataAndKylinProps(KylinConfig kylinConfig, boolean z) throws IOException {
        String distMetaUrl = getDistMetaUrl();
        if (StringUtils.isEmpty(distMetaUrl)) {
            throw new RuntimeException("Missing metaUrl");
        }
        File createTempFile = File.createTempFile("kylin_job_meta", EMPTY);
        FileUtils.forceDelete(createTempFile);
        Properties exportToProperties = kylinConfig.exportToProperties();
        exportToProperties.setProperty("kylin.metadata.url", distMetaUrl);
        modifyDump(exportToProperties);
        if (z) {
            ResourceStore.dumpKylinProps(createTempFile, exportToProperties);
        } else {
            Map map = (Map) EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().readonly(true).unitName(getProject()).maxRetry(1).processor(() -> {
                HashMap newHashMap = Maps.newHashMap();
                for (String str : getMetadataDumpList(kylinConfig)) {
                    newHashMap.put(str, ResourceStore.getKylinMetaStore(kylinConfig).getResource(str));
                }
                return newHashMap;
            }).build());
            if (Objects.isNull(map) || map.isEmpty()) {
                return;
            } else {
                ResourceStore.dumpResourceMaps(createTempFile, map, exportToProperties);
            }
        }
        Properties exportToProperties2 = kylinConfig.exportToProperties();
        exportToProperties2.setProperty("kylin.metadata.url", distMetaUrl);
        MetadataStore.createMetadataStore(KylinConfig.createKylinConfig(exportToProperties2)).uploadFromFile(createTempFile);
        logger.debug("Copied metadata to the target metaUrl, delete the temp dir: {}", createTempFile);
        FileUtils.forceDelete(createTempFile);
    }

    private void modifyDump(Properties properties) {
        this.sparkJobHandler.modifyDump(properties);
        removeUnNecessaryDump(properties);
    }

    private void removeUnNecessaryDump(Properties properties) {
        properties.remove("kylin.engine.spark-conf.spark.jars");
        properties.remove("kylin.engine.spark-conf.spark.yarn.dist.jars");
        properties.remove("kylin.engine.spark-conf.spark.files");
        properties.remove("kylin.engine.spark-conf.spark.yarn.dist.files");
        properties.remove("kylin.engine.spark-conf.spark.driver.extraJavaOptions");
        properties.remove("kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions");
        properties.remove("kylin.engine.spark-conf.spark.executor.extraJavaOptions");
        properties.remove("kylin.engine.spark-conf.spark.driver.extraClassPath");
        properties.remove("kylin.engine.spark-conf.spark.executor.extraClassPath");
        properties.remove("kylin.query.async-query.spark-conf.spark.yarn.am.extraJavaOptions");
        properties.remove("kylin.query.async-query.spark-conf.spark.executor.extraJavaOptions");
        properties.remove("kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions");
        properties.remove("kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions");
    }

    private void deleteSnapshotDirectoryOnExists() {
        if (isResumable()) {
            return;
        }
        String snapshotCheckPointDir = KylinConfig.getInstanceFromEnv().getSnapshotCheckPointDir(getProject(), getId().split("_")[0]);
        try {
            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(snapshotCheckPointDir));
        } catch (Exception e) {
            logger.error("delete snapshot checkpoint in path {} failed.", snapshotCheckPointDir, e);
        }
    }

    private void deleteJobTmpDirectoryOnExists() {
        if (getConfig().isDeleteJobTmpWhenRetry() && !isResumable()) {
            String parameter = StorageURL.valueOf(getDistMetaUrl()).getParameter("path");
            String[] split = parameter.split("/");
            String substring = parameter.substring(0, (parameter.length() - 1) - split[split.length - 1].length());
            try {
                HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(substring));
            } catch (Exception e) {
                logger.error("delete job tmp in path {} failed.", substring, e);
            }
        }
    }

    protected String getJobNamePrefix() {
        return SparkApplication.JOB_NAME_PREFIX;
    }

    protected String getExtJar() {
        return EMPTY;
    }

    public boolean needMergeMetadata() {
        return false;
    }

    public void mergerMetadata(MetadataMerger metadataMerger) {
        throw new UnsupportedOperationException();
    }

    public AbstractExecutable addStage(AbstractExecutable abstractExecutable) {
        int size = this.stages.size();
        abstractExecutable.setId(getId() + "_" + String.format(Locale.ROOT, "%02d", Integer.valueOf(size)));
        abstractExecutable.setParent(this);
        abstractExecutable.setStepId(size);
        this.stages.add((StageBase) abstractExecutable);
        return abstractExecutable;
    }

    public void setStageMap() {
        if (CollectionUtils.isEmpty(this.stages)) {
            return;
        }
        if (StringUtils.isBlank(getParam("segmentIds"))) {
            this.stagesMap.put(getId(), this.stages);
            return;
        }
        Iterator<String> it = getSegmentIds().iterator();
        while (it.hasNext()) {
            this.stagesMap.put(it.next(), this.stages);
        }
        if (StringUtils.isNotBlank(getParam("layoutIds"))) {
            setParam("indexCount", String.valueOf(StringHelper.splitAndTrim(getParam("layoutIds"), COMMA).length));
        }
    }

    public void setStageMapWithSegment(String str, List<StageBase> list) {
        this.stagesMap.getOrDefault(str, Lists.newCopyOnWriteArrayList()).addAll(list);
        this.stagesMap.put(str, list);
    }

    public Map<String, List<StageBase>> getStagesMap() {
        return this.stagesMap;
    }

    private boolean isClusterMode(Map<String, String> map) {
        return CLUSTER_MODE.equals(map.get(DEPLOY_MODE));
    }

    private Map<String, String> getSparkConf() {
        return getSparkConf(getConfig());
    }

    private Map<String, String> getSparkConf(KylinConfig kylinConfig) {
        KapConfig wrap = KapConfig.wrap(kylinConfig);
        Map<String, String> sparkConfigOverride = getSparkConfigOverride(kylinConfig);
        rewriteKerberosConf(wrap, sparkConfigOverride);
        rewriteDriverExtraJavaOptions(kylinConfig, wrap, sparkConfigOverride);
        rewriteExecutorExtraJavaOptions(kylinConfig, sparkConfigOverride);
        rewritePluginOptions(kylinConfig, sparkConfigOverride);
        rewriteExtraClasspath(kylinConfig, sparkConfigOverride);
        return Collections.unmodifiableMap(sparkConfigOverride);
    }

    private void rewriteDriverExtraJavaOptions(KylinConfig kylinConfig, KapConfig kapConfig, Map<String, String> map) {
        StringBuilder sb = new StringBuilder();
        if (map.containsKey(DRIVER_EXTRA_JAVA_OPTIONS)) {
            sb.append(map.get(DRIVER_EXTRA_JAVA_OPTIONS));
        }
        String hdfsWorkingDirectory = kylinConfig.getHdfsWorkingDirectory();
        String str = null;
        if (kylinConfig instanceof KylinConfigExt) {
            Map extendedOverrides = ((KylinConfigExt) kylinConfig).getExtendedOverrides();
            if (Objects.nonNull(extendedOverrides)) {
                str = (String) extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
            }
        }
        if (kapConfig.isCloud()) {
            String logLocalWorkingDirectory = kylinConfig.getLogLocalWorkingDirectory();
            if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
                hdfsWorkingDirectory = logLocalWorkingDirectory;
                str = logLocalWorkingDirectory + str;
            }
        }
        sb.append(SPACE).append("-Dkylin.hdfs.working.dir=").append(hdfsWorkingDirectory);
        sb.append(SPACE).append("-Dspark.driver.log4j.appender.hdfs.File=").append(str);
        rewriteDriverLog4jConf(sb, kylinConfig, map);
        sb.append(SPACE).append("-Dspark.driver.rest.server.address=").append(kylinConfig.getServerAddress());
        sb.append(SPACE).append("-Dspark.driver.param.taskId=").append(getId());
        sb.append(SPACE).append("-Dspark.driver.local.logDir=").append(KapConfig.getKylinLogDirAtBestEffort()).append("/spark");
        if (kapConfig.getPlatformZKEnable()) {
            sb.append(SPACE).append("-Djava.security.auth.login.config=").append(kapConfig.getKerberosJaasConfPath());
        }
        if (kylinConfig.buildJobProfilingEnabled()) {
            sb.append(SPACE).append("-Dspark.profiler.flagsDir=").append(kylinConfig.getJobTmpProfilerFlagsDir(this.project, getId()));
            sb.append(SPACE).append("-Dspark.profiler.collection.timeout=").append(kylinConfig.buildJobProfilingResultTimeout());
            sb.append(SPACE).append("-Dspark.profiler.profiling.timeout=").append(kylinConfig.buildJobProfilingProfileTimeout());
        }
        map.put(DRIVER_EXTRA_JAVA_OPTIONS, sb.toString().trim());
    }

    @VisibleForTesting
    public String getDriverExtraJavaOptions(KylinConfig kylinConfig) {
        KapConfig wrap = KapConfig.wrap(kylinConfig);
        Map<String, String> sparkConfigOverride = getSparkConfigOverride(kylinConfig);
        rewriteDriverExtraJavaOptions(kylinConfig, wrap, sparkConfigOverride);
        return sparkConfigOverride.get(DRIVER_EXTRA_JAVA_OPTIONS);
    }

    private void rewriteKerberosConf(KapConfig kapConfig, final Map<String, String> map) {
        if (Boolean.FALSE.equals(Boolean.valueOf(kapConfig.isKerberosEnabled()))) {
            return;
        }
        map.put("spark.kerberos.principal", kapConfig.getKerberosPrincipal());
        map.put("spark.kerberos.keytab", kapConfig.getKerberosKeytabPath());
        String str = HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf();
        ConfMap confMap = new ConfMap() { // from class: org.apache.kylin.engine.spark.job.NSparkExecutable.1
            @Override // org.apache.kylin.engine.spark.job.NSparkExecutable.ConfMap
            public String get(String str2) {
                return (String) map.get(str2);
            }

            @Override // org.apache.kylin.engine.spark.job.NSparkExecutable.ConfMap
            public void set(String str2, String str3) {
                map.put(str2, str3);
            }
        };
        if (isClusterMode(map)) {
            rewriteSpecifiedKrb5Conf(DRIVER_EXTRA_JAVA_OPTIONS, str, confMap);
        } else {
            rewriteSpecifiedKrb5Conf(DRIVER_EXTRA_JAVA_OPTIONS, kapConfig.getKerberosKrb5ConfPath(), confMap);
        }
        rewriteSpecifiedKrb5Conf(AM_EXTRA_JAVA_OPTIONS, str, confMap);
        rewriteSpecifiedKrb5Conf(EXECUTOR_EXTRA_JAVA_OPTIONS, str, confMap);
    }

    private void rewriteExecutorExtraJavaOptions(KylinConfig kylinConfig, Map<String, String> map) {
        StringBuilder sb = new StringBuilder();
        if (map.containsKey(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
            sb.append(map.get(EXECUTOR_EXTRA_JAVA_OPTIONS));
        }
        sb.append(SPACE).append("-Dkylin.dictionary.globalV2-store-class-name=").append(kylinConfig.getGlobalDictV2StoreImpl());
        map.put(EXECUTOR_EXTRA_JAVA_OPTIONS, sb.toString().trim());
    }

    private void rewriteSpecifiedKrb5Conf(String str, String str2, ConfMap confMap) {
        String str3 = confMap.get(str);
        if (Objects.isNull(str3)) {
            str3 = EMPTY;
        }
        if (str3.contains("-Djava.security.krb5.conf")) {
            return;
        }
        confMap.set(str, ("-Djava.security.krb5.conf=" + str2 + SPACE + str3).trim());
    }

    private void rewritePluginOptions(KylinConfig kylinConfig, Map<String, String> map) {
        if (kylinConfig.buildJobProfilingEnabled()) {
            map.computeIfPresent(SPARK_PLUGINS, (str, str2) -> {
                return str2 + COMMA + BuildAsyncProfilerSparkPlugin.class.getCanonicalName();
            });
            map.computeIfAbsent(SPARK_PLUGINS, str3 -> {
                return BuildAsyncProfilerSparkPlugin.class.getCanonicalName();
            });
        }
    }

    private void rewriteExtraClasspath(KylinConfig kylinConfig, Map<String, String> map) {
        if (!isClusterMode(map)) {
            Set<String> sparkJars = getSparkJars(kylinConfig, map);
            map.put(DRIVER_EXTRA_CLASSPATH, String.join(COLON, sparkJars));
            map.put(EXECUTOR_EXTRA_CLASSPATH, String.join(COLON, (Iterable<? extends CharSequence>) sparkJars.stream().map(str -> {
                return Paths.get(str, new String[0]).getFileName().toString();
            }).collect(Collectors.toSet())));
        } else {
            LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
            newLinkedHashSet.add(APP_JAR_NAME);
            newLinkedHashSet.addAll(getSparkJars(kylinConfig, map));
            String join = String.join(COLON, (Iterable<? extends CharSequence>) newLinkedHashSet.stream().map(str2 -> {
                return Paths.get(str2, new String[0]).getFileName().toString();
            }).collect(Collectors.toSet()));
            map.put(DRIVER_EXTRA_CLASSPATH, join);
            map.put(EXECUTOR_EXTRA_CLASSPATH, join);
        }
    }

    private void rewriteDriverLog4jConf(StringBuilder sb, KylinConfig kylinConfig, Map<String, String> map) {
        String logSparkDriverPropertiesFile = kylinConfig.getLogSparkDriverPropertiesFile();
        String path = Paths.get(logSparkDriverPropertiesFile, new String[0]).getFileName().toString();
        if (isClusterMode(map) || kylinConfig.getSparkMaster().startsWith("k8s")) {
            sb.append(SPACE).append("-Dlog4j.configurationFile=").append(path);
        } else {
            sb.append(SPACE).append("-Dlog4j.configurationFile=file:").append(logSparkDriverPropertiesFile);
        }
    }

    private Set<String> getSparkJars(KylinConfig kylinConfig, Map<String, String> map) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        newLinkedHashSet.add(kylinConfig.getKylinJobJarPath());
        newLinkedHashSet.add(kylinConfig.getExtraJarsPath());
        newLinkedHashSet.add(getJars());
        newLinkedHashSet.add(getExtJar());
        newLinkedHashSet.add(map.get(SPARK_JARS_1));
        newLinkedHashSet.add(map.get(SPARK_JARS_2));
        return Collections.unmodifiableSet((LinkedHashSet) newLinkedHashSet.stream().filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).flatMap(str -> {
            return Arrays.stream(StringUtils.split(str, COMMA));
        }).filter(str2 -> {
            return str2.endsWith(".jar");
        }).collect(Collectors.toCollection(LinkedHashSet::new)));
    }

    private Set<String> getSparkFiles(KylinConfig kylinConfig, Map<String, String> map) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        newLinkedHashSet.add(kylinConfig.getLogSparkAppMasterPropertiesFile());
        newLinkedHashSet.add(kylinConfig.getLogSparkDriverPropertiesFile());
        newLinkedHashSet.add(kylinConfig.getLogSparkExecutorPropertiesFile());
        if (kylinConfig.buildJobProfilingEnabled()) {
            try {
                newLinkedHashSet.add(kylinConfig.getAsyncProfilerFiles());
            } catch (IOException e) {
                logger.error("Add SparkPluginFile failed.", e);
            }
        }
        newLinkedHashSet.add(map.get(SPARK_FILES_1));
        newLinkedHashSet.add(map.get(SPARK_FILES_2));
        return Collections.unmodifiableSet((LinkedHashSet) newLinkedHashSet.stream().filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).flatMap(str -> {
            return Arrays.stream(StringUtils.split(str, COMMA));
        }).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).collect(Collectors.toCollection(LinkedHashSet::new)));
    }

    public void cancelJob() {
        killOrphanApplicationIfExists(getId());
    }
}
