package org.apache.kylin.query.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.util.BufferedLogger;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.job.DefaultSparkBuildJobHandler;
import org.apache.kylin.engine.spark.job.NSparkExecutable;
import org.apache.kylin.engine.spark.job.SparkAppDescription;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.query.util.QueryParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/query/engine/AsyncQueryJob.class */
public class AsyncQueryJob extends NSparkExecutable {
    private static final String GLOBAL = "/_global";
    private static final String DATAFLOW = "/dataflow";
    private static final String DATAFLOW_DETAIL = "/dataflow_details";
    private static final String INDEX_PLAN = "/index_plan";
    private static final String MODEL = "/model_desc";
    private static final String TABLE = "/table";
    private static final String TABLE_EXD = "/table_exd";
    private static final Logger logger = LoggerFactory.getLogger(AsyncQueryJob.class);
    private static final String ACL = "/acl";
    private static final String[] META_DUMP_LIST = {"/dataflow", "/dataflow_details", "/index_plan", "/model_desc", "/table", "/table_exd", ACL};

    public AsyncQueryJob() {
    }

    public AsyncQueryJob(Object obj) {
        super(obj);
    }

    @Override // org.apache.kylin.engine.spark.job.NSparkExecutable
    protected void initHandler() {
        this.sparkJobHandler = new DefaultSparkBuildJobHandler();
    }

    @Override // org.apache.kylin.engine.spark.job.NSparkExecutable
    protected ExecuteResult runSparkSubmit(String str, String str2, String str3) {
        BufferedLogger bufferedLogger = new BufferedLogger(logger);
        try {
            killOrphanApplicationIfExists(getId());
            SparkAppDescription sparkAppDesc = getSparkAppDesc();
            sparkAppDesc.setHadoopConfDir(str);
            sparkAppDesc.setKylinJobJar(str2);
            sparkAppDesc.setAppArgs(str3);
            return ExecuteResult.createSucceed(getCliCommandExecutor().execute((String) this.sparkJobHandler.generateSparkCmd(KylinConfig.getInstanceFromEnv(), sparkAppDesc), bufferedLogger, getId()).getCmd());
        } catch (Exception e) {
            return ExecuteResult.createError(e);
        }
    }

    @VisibleForTesting
    public CliCommandExecutor getCliCommandExecutor() {
        return new CliCommandExecutor();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kylin.engine.spark.job.NSparkExecutable
    public Map<String, String> getSparkConfigOverride(KylinConfig kylinConfig) {
        Map<String, String> asyncQuerySparkConfigOverride = kylinConfig.getAsyncQuerySparkConfigOverride();
        if (StringUtils.isNotEmpty(getParam(NBatchConstants.P_QUERY_QUEUE))) {
            asyncQuerySparkConfigOverride.put("spark.yarn.queue", getParam(NBatchConstants.P_QUERY_QUEUE));
        }
        if (!asyncQuerySparkConfigOverride.containsKey("spark.driver.memory")) {
            asyncQuerySparkConfigOverride.put("spark.driver.memory", "1024m");
        }
        if (UserGroupInformation.isSecurityEnabled()) {
            asyncQuerySparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
        }
        return asyncQuerySparkConfigOverride;
    }

    @Override // org.apache.kylin.engine.spark.job.NSparkExecutable
    protected String getJobNamePrefix() {
        return "";
    }

    @Override // org.apache.kylin.engine.spark.job.NSparkExecutable
    protected String getExtJar() {
        return getConfig().getKylinExtJarsPath();
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable, org.apache.kylin.job.execution.Executable
    public String getId() {
        return AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE + super.getId();
    }

    public ExecuteResult submit(QueryParams queryParams) throws ExecuteException, JsonProcessingException {
        setLogPath(getSparkDriverLogHdfsPath(getConfig()));
        KylinConfig config = getConfig();
        HashMap newHashMap = Maps.newHashMap(((KylinConfigExt) config).getExtendedOverrides());
        if (StringUtils.isNotEmpty(queryParams.getSparkQueue())) {
            newHashMap.put("kylin.query.async-query.spark-conf.spark.yarn.queue", queryParams.getSparkQueue());
        }
        KylinConfigExt createInstance = KylinConfigExt.createInstance(config, newHashMap);
        String kylinJobJarPath = createInstance.getKylinJobJarPath();
        if (StringUtils.isEmpty(kylinJobJarPath) && !createInstance.isUTEnv()) {
            throw new KylinRuntimeException("Missing kylin job jar");
        }
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        setParam(NBatchConstants.P_QUERY_PARAMS, objectMapper.writeValueAsString(queryParams));
        setParam(NBatchConstants.P_QUERY_CONTEXT, JsonUtil.writeValueAsString(QueryContext.current()));
        setParam("project", getProject());
        setParam(NBatchConstants.P_QUERY_ID, QueryContext.current().getQueryId());
        setParam(NBatchConstants.P_JOB_ID, getId());
        setParam(NBatchConstants.P_JOB_TYPE, JobTypeEnum.ASYNC_QUERY.toString());
        setParam(NBatchConstants.P_QUERY_QUEUE, queryParams.getSparkQueue());
        setDistMetaUrl(createInstance.getJobTmpMetaStoreUrl(getProject(), getId()));
        try {
            createInstance.setQueryHistoryUrl(createInstance.getQueryHistoryUrl().toString());
            attachMetadataAndKylinProps(createInstance, true);
            ArrayList newArrayList = Lists.newArrayList();
            ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(createInstance);
            newArrayList.addAll(kylinMetaStore.listResourcesRecursively("/_global"));
            for (String str : META_DUMP_LIST) {
                if (kylinMetaStore.listResourcesRecursively("/" + getProject() + str) != null) {
                    newArrayList.addAll(kylinMetaStore.listResourcesRecursively("/" + getProject() + str));
                }
            }
            KylinConfig createKylinConfig = KylinConfig.createKylinConfig(createInstance);
            createKylinConfig.setMetadataUrl(createInstance.getJobTmpMetaStoreUrl(getProject(), getId()).toString());
            MetadataStore.createMetadataStore(createKylinConfig).dump(ResourceStore.getKylinMetaStore(createInstance), newArrayList);
            return runSparkSubmit(getHadoopConfDir(), kylinJobJarPath, "-className org.apache.kylin.query.engine.AsyncQueryApplication " + createArgsFileOnHDFS(createInstance, getId()));
        } catch (Exception e) {
            throw new ExecuteException("kylin properties or meta dump failed", e);
        }
    }

    private String getHadoopConfDir() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        return StringUtils.isNotEmpty(instanceFromEnv.getAsyncQueryHadoopConfDir()) ? instanceFromEnv.getAsyncQueryHadoopConfDir() : HadoopUtil.getHadoopConfDir();
    }
}
