package org.apache.kylin.engine.spark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
import org.apache.kylin.engine.spark.exception.SparkException;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.project.ProjectManager;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkExecutable.class */
public class SparkExecutable extends AbstractExecutable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkExecutable.class);
    private static final String CLASS_NAME = "className";
    private static final String JARS = "jars";
    private static final String JOB_ID = "jobId";
    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
    private static final String CONFIG_NAME = "configName";

    public void setClassName(String str) {
        setParam(CLASS_NAME, str);
    }

    public void setJobId(String str) {
        setParam(JOB_ID, str);
    }

    public void setJars(String str) {
        setParam(JARS, str);
    }

    public void setCounterSaveAs(String str) {
        setParam(COUNTER_SAVE_AS, str);
    }

    public void setCounterSaveAs(String str, String str2) {
        setParam(COUNTER_SAVE_AS, str);
        setParam(BatchConstants.ARG_COUNTER_OUTPUT, str2);
    }

    public String getCounterSaveAs() {
        return getParam(COUNTER_SAVE_AS);
    }

    public void setSparkConfigName(String str) {
        setParam(CONFIG_NAME, str);
    }

    public String getSparkConfigName() {
        return getParam(CONFIG_NAME);
    }

    private String formatArgs() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : getParams().entrySet()) {
            StringBuilder sb2 = new StringBuilder();
            sb2.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
            if (entry.getKey().equals(CLASS_NAME)) {
                sb.insert(0, (CharSequence) sb2);
            } else if (!entry.getKey().equals(JARS) && !entry.getKey().equals(JOB_ID) && !entry.getKey().equals(COUNTER_SAVE_AS) && !entry.getKey().equals(CONFIG_NAME)) {
                sb.append((CharSequence) sb2);
            }
        }
        return sb.length() > 0 ? sb.substring(0, sb.length() - 1).toString() : "";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public void onExecuteStart(ExecutableContext executableContext) {
        Output output = getOutput();
        if (!output.getExtra().containsKey("startTime")) {
            super.onExecuteStart(executableContext);
            return;
        }
        String str = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
        if (StringUtils.isEmpty(str)) {
            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            return;
        }
        try {
            String appState = getAppState(str);
            if (appState == null || appState.equals(AbstractLifeCycle.FAILED) || appState.equals("KILLED")) {
                super.onExecuteStart(executableContext);
            } else {
                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            }
        } catch (IOException e) {
            logger.warn("error get hadoop status");
            super.onExecuteStart(executableContext);
        }
    }

    protected ExecuteResult onResumed(String str, ExecutableManager executableManager) throws ExecuteException {
        HashMap hashMap = new HashMap();
        try {
            logger.info("spark_job_id:" + str + " resumed");
            hashMap.put(ExecutableConstants.SPARK_JOB_ID, str);
            while (!isPaused() && !isDiscarded()) {
                String appState = getAppState(str);
                if (appState.equals(AbstractLifeCycle.FAILED) || appState.equals("KILLED")) {
                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, str + " has failed");
                    return new ExecuteResult(ExecuteResult.State.FAILED, str + " has failed");
                }
                if (appState.equals("SUCCEEDED")) {
                    executableManager.addJobInfo(getId(), hashMap);
                    return new ExecuteResult(ExecuteResult.State.SUCCEED, str + " has finished");
                }
                Thread.sleep(5000L);
            }
            killAppRetry(str);
            return isDiscarded() ? new ExecuteResult(ExecuteResult.State.DISCARDED, str + " is discarded") : new ExecuteResult(ExecuteResult.State.STOPPED, str + " is stopped");
        } catch (Exception e) {
            logger.error("error run spark job:", (Throwable) e);
            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
        }
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected ExecuteResult doWork(ExecutableContext executableContext, IJobRunner iJobRunner) throws ExecuteException {
        ExecutableManager manager = getManager();
        String str = manager.getOutput(getId()).getExtra().get(ExecutableConstants.SPARK_JOB_ID);
        if (!StringUtils.isEmpty(str)) {
            return onResumed(str, manager);
        }
        String param = getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
        CubeInstance cube = param != null ? CubeManager.getInstance(executableContext.getConfig()).getCube(param) : null;
        KylinConfig config = cube != null ? cube.getConfig() : ProjectManager.getInstance(executableContext.getConfig()).getProject(getParam(SparkColumnCardinality.OPTION_PRJ.getOpt())).getConfig();
        if (KylinConfig.getSparkHome() == null) {
            throw new NullPointerException();
        }
        if (config.getKylinJobJarPath() == null) {
            throw new NullPointerException();
        }
        String param2 = getParam(JARS);
        String property = System.getProperty("kylin.hadoop.conf.dir");
        if (StringUtils.isEmpty(property)) {
            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
        }
        logger.info("Using " + property + " as HADOOP_CONF_DIR");
        String kylinJobJarPath = config.getKylinJobJarPath();
        if (StringUtils.isEmpty(param2)) {
            param2 = kylinJobJarPath;
        }
        if (cube != null && !isCreateFlatTable()) {
            setAlgorithmLayer();
            CubeSegment segmentById = cube.getSegmentById(getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()));
            dumpMetadata(segmentById, cube.getMergingSegments(segmentById));
        }
        StringBuilder sb = new StringBuilder();
        if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
            sb.append("set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
        } else {
            sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
        }
        if (getName() != null) {
            sb.append("--name \"" + getName() + "\"");
        }
        Map<String, String> sparkConfigOverride = config.getSparkConfigOverride();
        String sparkConfigName = getSparkConfigName();
        if (sparkConfigName != null) {
            sparkConfigOverride.putAll(config.getSparkConfigOverrideWithSpecificName(sparkConfigName));
        }
        if (StringUtils.isNotEmpty(config.getHBaseClusterFs())) {
            String str2 = sparkConfigOverride.get("spark.yarn.access.hadoopFileSystems");
            if (StringUtils.isNotEmpty(str2)) {
                sparkConfigOverride.put("spark.yarn.access.hadoopFileSystems", str2 + "," + config.getHBaseClusterFs());
            } else {
                sparkConfigOverride.put("spark.yarn.access.hadoopFileSystems", config.getHBaseClusterFs());
            }
        }
        for (Map.Entry<String, String> entry : sparkConfigOverride.entrySet()) {
            sb.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
        }
        sb.append("--jars %s %s %s");
        final String format = String.format(Locale.ROOT, sb.toString(), property, KylinConfig.getSparkHome(), param2, kylinJobJarPath, formatArgs());
        logger.info("cmd: " + format);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final CliCommandExecutor cliCommandExecutor = new CliCommandExecutor();
        final PatternedLogger patternedLogger = new PatternedLogger(logger, 4, new PatternedLogger.ILogListener() { // from class: org.apache.kylin.engine.spark.SparkExecutable.1
            @Override // org.apache.kylin.job.common.PatternedLogger.ILogListener
            public void onLogEvent(String str3, Map<String, String> map) {
                if (ExecutableConstants.SPARK_JOB_ID.equals(str3) || ExecutableConstants.YARN_APP_ID.equals(str3) || "yarn_application_tracking_url".equals(str3)) {
                    SparkExecutable.this.getManager().addJobInfo(SparkExecutable.this.getId(), map);
                }
            }
        });
        try {
            Future submit = newSingleThreadExecutor.submit(new Callable<Pair<Integer, String>>() { // from class: org.apache.kylin.engine.spark.SparkExecutable.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Pair<Integer, String> call() throws Exception {
                    Pair<Integer, String> pair;
                    try {
                        pair = cliCommandExecutor.execute(format, patternedLogger);
                    } catch (Exception e) {
                        SparkExecutable.logger.error("error run spark job:", (Throwable) e);
                        pair = new Pair<>(-1, e.getMessage());
                    }
                    return pair;
                }
            });
            Pair pair = null;
            while (true) {
                if (isDiscarded() || isPaused()) {
                    break;
                }
                if (submit.isDone()) {
                    pair = (Pair) submit.get();
                    break;
                }
                Thread.sleep(5000L);
            }
            if (!submit.isDone()) {
                newSingleThreadExecutor.shutdownNow();
                Map<String, String> extra = manager.getOutput(getId()).getExtra();
                if (extra != null) {
                    String str3 = extra.get(ExecutableConstants.SPARK_JOB_ID);
                    if (StringUtils.isNotEmpty(str3)) {
                        killAppRetry(str3);
                    }
                }
                if (isDiscarded()) {
                    return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
                }
                if (isPaused()) {
                    return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
                }
                throw new IllegalStateException();
            }
            if (pair == null) {
                pair = (Pair) submit.get();
            }
            if (pair == null || ((Integer) pair.getFirst()).intValue() != 0) {
                Map<String, String> extra2 = manager.getOutput(getId()).getExtra();
                extra2.put(ExecutableConstants.SPARK_JOB_ID, "");
                getManager().addJobInfo(getId(), extra2);
                String str4 = pair != null ? (String) pair.getSecond() : "";
                if (str4.length() > config.getSparkOutputMaxSize()) {
                    str4 = str4.substring(0, config.getSparkOutputMaxSize());
                }
                return ExecuteResult.createFailed(new SparkException(str4));
            }
            Map<String, String> info = patternedLogger.getInfo();
            String param3 = getParam(BatchConstants.ARG_COUNTER_OUTPUT);
            if (param3 != null) {
                if (HadoopUtil.getWorkingFileSystem().exists(new Path(param3))) {
                    info.putAll(HadoopUtil.readFromSequenceFile(param3));
                } else {
                    logger.warn("Spark counter output path not exists: " + param3);
                }
            }
            readCounters(info);
            getManager().addJobInfo(getId(), info);
            if (info.containsKey(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID)) {
                updateSparkDimensionDicMetadata(config, cube, info.get(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID));
                logger.info("Finished update dictionaries and snapshot info from {} to {}.", getParam(SparkBuildDictionary.OPTION_META_URL.getOpt()), config.getMetadataUrl());
            }
            return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
        } catch (Exception e) {
            logger.error("Error run spark job:", (Throwable) e);
            return ExecuteResult.createError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dumpMetadata(CubeSegment cubeSegment, List<CubeSegment> list) throws ExecuteException {
        if (list != null) {
            try {
                if (list.size() != 0) {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(cubeSegment);
                    arrayList.addAll(list);
                    attachSegmentsMetadataWithDict(arrayList);
                }
            } catch (IOException e) {
                throw new ExecuteException("meta dump failed");
            }
        }
        attachSegmentMetadataWithDict(cubeSegment);
    }

    private void updateSparkDimensionDicMetadata(KylinConfig kylinConfig, CubeInstance cubeInstance, String str) throws IOException {
        CubeSegment segmentById = CubeManager.getInstance(AbstractHadoopJob.loadKylinConfigFromHdfs(getParam(SparkBuildDictionary.OPTION_META_URL.getOpt()))).reloadCube(cubeInstance.getName()).getSegmentById(str);
        CubeSegment segmentById2 = cubeInstance.getSegmentById(str);
        segmentById2.setDictionaries((ConcurrentHashMap) segmentById.getDictionaries());
        segmentById2.setSnapshots((ConcurrentHashMap) segmentById.getSnapshots());
        segmentById2.getRowkeyStats().addAll(segmentById.getRowkeyStats());
        CubeUpdate cubeUpdate = new CubeUpdate(cubeInstance.latestCopyForWrite());
        cubeUpdate.setToUpdateSegs(segmentById2);
        CubeManager.getInstance(kylinConfig).updateCube(cubeUpdate);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.addAll(segmentById.getDictionaryPaths());
        linkedHashSet.addAll(segmentById.getSnapshotPaths());
        JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(linkedHashSet, (KylinConfigExt) segmentById.getConfig(), kylinConfig.getMetadataUrl().toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAlgorithmLayer() {
        ((CubingJob) ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).getJob(getParam(JOB_ID))).setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
    }

    protected String getAppState(String str) throws IOException {
        if (StringUtils.isEmpty(str)) {
            throw new IOException("The app is is null or empty");
        }
        CliCommandExecutor cliCommandExecutor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
        PatternedLogger patternedLogger = new PatternedLogger(logger);
        cliCommandExecutor.execute(String.format(Locale.ROOT, "yarn application -status %s", str), patternedLogger);
        return patternedLogger.getInfo().get(ExecutableConstants.YARN_APP_STATE);
    }

    protected void killApp(String str) throws IOException, InterruptedException {
        KylinConfig.getInstanceFromEnv().getCliCommandExecutor().execute(String.format(Locale.ROOT, "yarn application -kill %s", str));
    }

    protected int killAppRetry(String str) throws IOException, InterruptedException {
        if (StringUtils.isEmpty(str)) {
            logger.warn("The app is is null or empty");
            return 0;
        }
        String appState = getAppState(str);
        if ("SUCCEEDED".equals(appState) || AbstractLifeCycle.FAILED.equals(appState) || "KILLED".equals(appState)) {
            logger.warn(str + "is final state, no need to kill");
            return 0;
        }
        killApp(str);
        String appState2 = getAppState(str);
        int i = 0;
        while (true) {
            if (appState2 == null || (!appState2.equals("KILLED") && i < 5)) {
                killApp(str);
                Thread.sleep(1000L);
                appState2 = getAppState(str);
                i++;
            }
        }
        if ("KILLED".equals(appState2)) {
            logger.info(str + " killed successfully");
            return 0;
        }
        logger.info(str + " killed failed");
        return 1;
    }

    private void attachSegmentMetadataWithDict(CubeSegment cubeSegment) throws IOException {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.addAll(JobRelatedMetaUtil.collectCubeMetadata(cubeSegment.getCubeInstance()));
        linkedHashSet.addAll(cubeSegment.getDictionaryPaths());
        if (ResourceStore.getStore(cubeSegment.getConfig()).exists(cubeSegment.getStatisticsResourcePath())) {
            linkedHashSet.add(cubeSegment.getStatisticsResourcePath());
        }
        CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(cubeSegment.getCubeDesc(), linkedHashSet);
        JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(linkedHashSet, (KylinConfigExt) cubeSegment.getConfig(), getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
    }

    private void attachSegmentsMetadataWithDict(List<CubeSegment> list) throws IOException {
        LinkedHashSet linkedHashSet = new LinkedHashSet(JobRelatedMetaUtil.collectCubeMetadata(list.get(0).getCubeInstance()));
        ResourceStore store = ResourceStore.getStore(list.get(0).getConfig());
        for (CubeSegment cubeSegment : list) {
            linkedHashSet.addAll(cubeSegment.getDictionaryPaths());
            if (store.exists(cubeSegment.getStatisticsResourcePath())) {
                linkedHashSet.add(cubeSegment.getStatisticsResourcePath());
            }
            CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(cubeSegment.getCubeDesc(), linkedHashSet);
        }
        JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(linkedHashSet, (KylinConfigExt) list.get(0).getConfig(), getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readCounters(Map<String, String> map) {
        String counterSaveAs = getCounterSaveAs();
        if (counterSaveAs != null) {
            String[] split = counterSaveAs.split(",");
            saveCounterAs(map.get(ExecutableConstants.SOURCE_RECORDS_COUNT), split, 0, map);
            saveCounterAs(map.get(ExecutableConstants.SOURCE_RECORDS_SIZE), split, 1, map);
            saveCounterAs(map.get(ExecutableConstants.HDFS_BYTES_WRITTEN), split, 2, map);
        }
    }

    private void saveCounterAs(String str, String[] strArr, int i, Map<String, String> map) {
        if (strArr.length <= i || StringUtils.isBlank(strArr[i])) {
            return;
        }
        map.put(strArr[i].trim(), str);
    }

    private boolean isCreateFlatTable() {
        return ExecutableConstants.STEP_NAME_CREATE_FLAT_TABLE_WITH_SPARK.equals(getName());
    }
}
