package org.apache.kylin.engine.spark;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.livy.LivyRestBuilder;
import org.apache.kylin.common.livy.LivyRestExecutor;
import org.apache.kylin.common.livy.LivyStateEnum;
import org.apache.kylin.common.livy.LivyTypeEnum;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-3.1.3.jar:org/apache/kylin/engine/spark/SparkExecutableLivy.class */
public class SparkExecutableLivy extends SparkExecutable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkExecutableLivy.class);
    private static final String CLASS_NAME = "className";
    private static final String JARS = "jars";
    private static final String JOB_ID = "jobId";
    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
    private static final String CONFIG_NAME = "configName";

    public void formatArgs(List<String> list) {
        Iterator<Map.Entry<String, String>> it = getParams().entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, String> next = it.next();
            if (next.getKey().equals(CLASS_NAME)) {
                list.add("-" + next.getKey());
                list.add(next.getValue());
                break;
            }
        }
        for (Map.Entry<String, String> entry : getParams().entrySet()) {
            if (!entry.getKey().equals(CLASS_NAME) && !entry.getKey().equals(JARS) && !entry.getKey().equals(JOB_ID) && !entry.getKey().equals(COUNTER_SAVE_AS) && !entry.getKey().equals(CONFIG_NAME)) {
                list.add("-" + entry.getKey());
                list.add(entry.getValue());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.engine.spark.SparkExecutable, org.apache.kylin.job.execution.AbstractExecutable
    public void onExecuteStart(ExecutableContext executableContext) {
        Output output = getOutput();
        if (!output.getExtra().containsKey("startTime")) {
            super.onExecuteStart(executableContext);
            return;
        }
        String str = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
        if (str == null) {
            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            return;
        }
        try {
            String appState = getAppState(str);
            if (Strings.isNullOrEmpty(appState) || LivyStateEnum.dead.name().equalsIgnoreCase(appState) || LivyStateEnum.error.name().equalsIgnoreCase(appState) || LivyStateEnum.shutting_down.name().equalsIgnoreCase(appState)) {
                super.onExecuteStart(executableContext);
            } else {
                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
            }
        } catch (IOException e) {
            logger.warn("error get hadoop status");
            super.onExecuteStart(executableContext);
        }
    }

    @Override // org.apache.kylin.engine.spark.SparkExecutable
    protected ExecuteResult onResumed(String str, ExecutableManager executableManager) throws ExecuteException {
        HashMap hashMap = new HashMap();
        try {
            logger.info("livy spark_job_id:" + str + " resumed");
            hashMap.put(ExecutableConstants.SPARK_JOB_ID, str);
            while (!isPaused() && !isDiscarded()) {
                String appState = getAppState(str);
                if (Strings.isNullOrEmpty(appState) || LivyStateEnum.dead.name().equalsIgnoreCase(appState) || LivyStateEnum.error.name().equalsIgnoreCase(appState) || LivyStateEnum.shutting_down.name().equalsIgnoreCase(appState)) {
                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, str + " has failed");
                    return new ExecuteResult(ExecuteResult.State.FAILED, str + " has failed");
                }
                if (LivyStateEnum.success.name().equalsIgnoreCase(appState)) {
                    executableManager.addJobInfo(getId(), hashMap);
                    return new ExecuteResult(ExecuteResult.State.SUCCEED, str + " has finished");
                }
                Thread.sleep(5000L);
            }
            killAppRetry(str);
            return isDiscarded() ? new ExecuteResult(ExecuteResult.State.DISCARDED, str + " is discarded") : new ExecuteResult(ExecuteResult.State.STOPPED, str + " is stopped");
        } catch (Exception e) {
            logger.error("error run spark job:", (Throwable) e);
            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
        }
    }

    @Override // org.apache.kylin.engine.spark.SparkExecutable, org.apache.kylin.job.execution.AbstractExecutable
    protected ExecuteResult doWork(ExecutableContext executableContext, IJobRunner iJobRunner) throws ExecuteException {
        ExecutableManager manager = getManager();
        String str = manager.getOutput(getId()).getExtra().get(ExecutableConstants.SPARK_JOB_ID);
        if (!StringUtils.isEmpty(str)) {
            return onResumed(str, manager);
        }
        CubeInstance cube = CubeManager.getInstance(executableContext.getConfig()).getCube(getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()));
        KylinConfig config = cube.getConfig();
        setAlgorithmLayer();
        LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
        CubeSegment segmentById = cube.getSegmentById(getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()));
        dumpMetadata(segmentById, cube.getMergingSegments(segmentById));
        Map<String, String> sparkConfigOverride = config.getSparkConfigOverride();
        String sparkConfigName = getSparkConfigName();
        if (sparkConfigName != null) {
            sparkConfigOverride.putAll(config.getSparkConfigOverrideWithSpecificName(sparkConfigName));
        }
        for (Map.Entry<String, String> entry : sparkConfigOverride.entrySet()) {
            if (!entry.getKey().equals("spark.submit.deployMode") && !entry.getKey().equals("spark.master") && !entry.getKey().equals("spark.yarn.archive")) {
                livyRestBuilder.addConf(entry.getKey(), entry.getValue());
            }
        }
        formatArgs(livyRestBuilder.getArgs());
        LivyRestExecutor livyRestExecutor = new LivyRestExecutor();
        PatternedLogger patternedLogger = new PatternedLogger(logger, 4, (str2, map) -> {
            if (ExecutableConstants.SPARK_JOB_ID.equals(str2) || ExecutableConstants.YARN_APP_ID.equals(str2) || "yarn_application_tracking_url".equals(str2)) {
                getManager().addJobInfo(getId(), map);
            }
        });
        try {
            livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);
            livyRestExecutor.execute(livyRestBuilder, patternedLogger);
            if (isDiscarded()) {
                return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
            }
            if (isPaused()) {
                return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
            }
            Map<String, String> info = patternedLogger.getInfo();
            String param = getParam(BatchConstants.ARG_COUNTER_OUTPUT);
            if (param != null) {
                if (HadoopUtil.getWorkingFileSystem().exists(new Path(param))) {
                    info.putAll(HadoopUtil.readFromSequenceFile(param));
                } else {
                    logger.warn("Spark counter output path not exists: " + param);
                }
            }
            readCounters(info);
            getManager().addJobInfo(getId(), info);
            return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
        } catch (Exception e) {
            logger.error("error run spark job:", (Throwable) e);
            Map<String, String> extra = manager.getOutput(getId()).getExtra();
            extra.put(ExecutableConstants.SPARK_JOB_ID, "");
            getManager().addJobInfo(getId(), extra);
            return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
        }
    }

    @Override // org.apache.kylin.engine.spark.SparkExecutable
    protected String getAppState(String str) throws IOException {
        return new LivyRestExecutor().state(str);
    }

    @Override // org.apache.kylin.engine.spark.SparkExecutable
    protected void killApp(String str) throws IOException, InterruptedException {
        new LivyRestExecutor().kill(str);
    }

    @Override // org.apache.kylin.engine.spark.SparkExecutable
    protected int killAppRetry(String str) throws IOException, InterruptedException {
        String appState = getAppState(str);
        if (Strings.isNullOrEmpty(appState) || LivyStateEnum.dead.name().equalsIgnoreCase(appState) || LivyStateEnum.error.name().equalsIgnoreCase(appState) || LivyStateEnum.shutting_down.name().equalsIgnoreCase(appState)) {
            logger.warn(str + "is final state, no need to kill");
            return 0;
        }
        killApp(str);
        String appState2 = getAppState(str);
        int i = 0;
        while (true) {
            if (Strings.isNullOrEmpty(appState2) || LivyStateEnum.dead.name().equalsIgnoreCase(appState2) || LivyStateEnum.error.name().equalsIgnoreCase(appState2) || (LivyStateEnum.shutting_down.name().equalsIgnoreCase(appState2) && i < 5)) {
                killApp(str);
                Thread.sleep(1000L);
                appState2 = getAppState(str);
                i++;
            }
        }
        if (Strings.isNullOrEmpty(appState2)) {
            logger.info(str + " killed successfully");
            return 0;
        }
        logger.info(str + " killed failed");
        return 1;
    }
}
