package org.apache.kylin.engine.spark;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.tool.shaded.org.apache.commons.io.FileUtils;
import org.apache.kylin.tool.shaded.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkExecutable.class */
public class SparkExecutable extends AbstractExecutable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkExecutable.class);
    private static final String CLASS_NAME = "className";
    private static final String JARS = "jars";
    private static final String JOB_ID = "jobId";

    public void setClassName(String str) {
        setParam(CLASS_NAME, str);
    }

    public void setJobId(String str) {
        setParam(JOB_ID, str);
    }

    public void setJars(String str) {
        setParam(JARS, str);
    }

    private String formatArgs() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : getParams().entrySet()) {
            StringBuilder sb2 = new StringBuilder();
            sb2.append("-").append(entry.getKey()).append(StringUtils.SPACE).append(entry.getValue()).append(StringUtils.SPACE);
            if (entry.getKey().equals(CLASS_NAME)) {
                sb.insert(0, (CharSequence) sb2);
            } else if (!entry.getKey().equals(JARS) && !entry.getKey().equals(JOB_ID)) {
                sb.append((CharSequence) sb2);
            }
        }
        return sb.length() > 0 ? sb.substring(0, sb.length() - 1).toString() : "";
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        CubeInstance cube = CubeManager.getInstance(executableContext.getConfig()).getCube(getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()));
        KylinConfig config = cube.getConfig();
        setAlgorithmLayer();
        if (KylinConfig.getSparkHome() == null) {
            throw new NullPointerException();
        }
        if (config.getKylinJobJarPath() == null) {
            throw new NullPointerException();
        }
        String param = getParam(JARS);
        String property = System.getProperty("kylin.hadoop.conf.dir");
        if (org.apache.kylin.tool.shaded.org.apache.commons.lang.StringUtils.isEmpty(property)) {
            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
        }
        logger.info("Using " + property + " as HADOOP_CONF_DIR");
        String kylinJobJarPath = config.getKylinJobJarPath();
        if (org.apache.kylin.tool.shaded.org.apache.commons.lang.StringUtils.isEmpty(param)) {
            param = kylinJobJarPath;
        }
        try {
            attachSegmentMetadataWithDict(cube.getSegmentById(getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt())));
            StringBuilder sb = new StringBuilder();
            sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
            for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) {
                sb.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(StringUtils.SPACE);
            }
            sb.append("--jars %s %s %s");
            try {
                String format = String.format(sb.toString(), property, KylinConfig.getSparkHome(), param, kylinJobJarPath, formatArgs());
                logger.info("cmd: " + format);
                CliCommandExecutor cliCommandExecutor = new CliCommandExecutor();
                PatternedLogger patternedLogger = new PatternedLogger(logger);
                cliCommandExecutor.execute(format, patternedLogger);
                getManager().addJobInfo(getId(), patternedLogger.getInfo());
                return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
            } catch (Exception e) {
                logger.error("error run spark job:", (Throwable) e);
                return ExecuteResult.createError(e);
            }
        } catch (IOException e2) {
            throw new ExecuteException("meta dump fialed");
        }
    }

    private void setAlgorithmLayer() {
        ((CubingJob) ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).getJob(getParam(JOB_ID))).setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
    }

    private void attachSegmentMetadataWithDict(CubeSegment cubeSegment) throws IOException {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.addAll(JobRelatedMetaUtil.collectCubeMetadata(cubeSegment.getCubeInstance()));
        linkedHashSet.addAll(cubeSegment.getDictionaryPaths());
        linkedHashSet.add(cubeSegment.getStatisticsResourcePath());
        dumpAndUploadKylinPropsAndMetadata(linkedHashSet, (KylinConfigExt) cubeSegment.getConfig());
    }

    private void dumpAndUploadKylinPropsAndMetadata(Set<String> set, KylinConfigExt kylinConfigExt) throws IOException {
        File createTempFile = File.createTempFile("kylin_job_meta", "");
        FileUtils.forceDelete(createTempFile);
        File file = new File(createTempFile, "meta");
        file.mkdirs();
        JobRelatedMetaUtil.dumpResources(kylinConfigExt, file, set);
        Properties exportToProperties = kylinConfigExt.exportToProperties();
        exportToProperties.setProperty("kylin.metadata.url", getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
        File file2 = new File(file, KylinConfig.KYLIN_CONF_PROPERTIES_FILE);
        FileOutputStream fileOutputStream = new FileOutputStream(file2);
        Throwable th = null;
        try {
            try {
                exportToProperties.store(fileOutputStream, file2.getAbsolutePath());
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                ResourceTool.copy(KylinConfig.createInstanceFromUri(file.getAbsolutePath()), KylinConfig.createKylinConfig(exportToProperties));
            } finally {
            }
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }
}
