package org.apache.kylin.engine.spark.application;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.kylin.cluster.IClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.util.Application;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.TimeZoneUtils;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
import org.apache.kylin.engine.spark.job.EnviromentAdaptor;
import org.apache.kylin.engine.spark.job.IJobProgressReport;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.engine.spark.job.ResourceDetect;
import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
import org.apache.kylin.engine.spark.job.SegmentBuildJob;
import org.apache.kylin.engine.spark.job.StageType;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.job.stage.StageExec;
import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
import org.apache.kylin.engine.spark.utils.HDFSUtils;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.view.LogicalView;
import org.apache.kylin.metadata.view.LogicalViewManager;
import org.apache.kylin.query.pushdown.SparkSubmitter;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.application.NoRetryException;
import org.apache.spark.sql.KylinSession;
import org.apache.spark.sql.KylinSession$;
import org.apache.spark.sql.LogicalViewLoader;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.rules.Rule;
import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/kylin/engine/spark/application/SparkApplication.class */
public abstract class SparkApplication implements Application {
    private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class);
    public static final String JOB_NAME_PREFIX = "job_step_";
    private IJobProgressReport report;
    protected volatile KylinConfig config;
    protected volatile String jobId;
    protected String project;
    protected BuildJobInfos infos;
    protected String path;
    private ClusterMonitor clusterMonitor;
    protected SparkSession ss;
    private Map<String, String> params = Maps.newHashMap();
    protected int layoutSize = -1;
    protected ConcurrentHashMap<String, Boolean> skipFollowingStagesMap = new ConcurrentHashMap<>();
    private final AtomicLong atomicDisconnectSparkMasterTimes = new AtomicLong(0);
    private final AtomicBoolean atomicUnreachableSparkMaster = new AtomicBoolean(false);
    private final AtomicReference<SparkConf> atomicSparkConf = new AtomicReference<>(null);
    private final AtomicReference<SparkSession> atomicSparkSession = new AtomicReference<>(null);
    private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new AtomicReference<>(null);

    public void execute(String[] strArr) {
        try {
            this.path = strArr[0];
            String readArgsFromHDFS = readArgsFromHDFS();
            this.params = JsonUtil.readValueAsMap(readArgsFromHDFS);
            logger.info("Execute {} with args : {}", getClass().getName(), readArgsFromHDFS);
            execute();
        } catch (Exception e) {
            throw new RuntimeException("Error execute " + getClass().getName(), e);
        }
    }

    public AtomicBoolean getAtomicUnreachableSparkMaster() {
        return this.atomicUnreachableSparkMaster;
    }

    public final Map<String, String> getParams() {
        return this.params;
    }

    public final String getParam(String str) {
        return this.params.get(str);
    }

    public final void setParam(String str, String str2) {
        this.params.put(str, str2);
    }

    public final boolean contains(String str) {
        return this.params.containsKey(str);
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getProject() {
        return this.project;
    }

    public KylinConfig getConfig() {
        return this.config;
    }

    public IJobProgressReport getReport() {
        return this.report == null ? new RestfulJobProgressReport() : this.report;
    }

    public SparkSession getSparkSession() throws NoRetryException {
        SparkSession sparkSession = this.atomicSparkSession.get();
        if (Objects.isNull(sparkSession)) {
            throw new NoRetryException("spark session shouldn't be null");
        }
        return sparkSession;
    }

    public String readArgsFromHDFS() {
        FSDataInputStream open;
        Throwable th;
        String str = null;
        try {
            open = HadoopUtil.getFileSystem(this.path).open(new Path(this.path));
            th = null;
        } catch (IOException e) {
            logger.error("Error occurred when reading args file: {}", this.path, e);
        }
        try {
            try {
                str = new BufferedReader(new InputStreamReader((InputStream) open, Charset.defaultCharset())).readLine();
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                return str;
            } finally {
            }
        } finally {
        }
    }

    public String getTrackingUrl(IClusterManager iClusterManager, SparkSession sparkSession) {
        return iClusterManager.getBuildTrackingUrl(sparkSession);
    }

    public void setSkipFollowingStages(String str) {
        this.skipFollowingStagesMap.put(str, true);
    }

    public boolean isSkipFollowingStages(String str) {
        if (str == null) {
            return false;
        }
        return ((Boolean) Optional.ofNullable(this.skipFollowingStagesMap.get(str)).orElse(false)).booleanValue();
    }

    private String tryReplaceHostAddress(String str) {
        String str2 = null;
        try {
            str2 = URI.create(str).getHost();
            return str.replace(str2, InetAddress.getByName(str2).getHostAddress());
        } catch (UnknownHostException e) {
            logger.error("failed to get the ip address of {}, step back to use the origin tracking url.", str2, e);
            return str;
        }
    }

    private Map<String, String> getTrackingInfo(SparkSession sparkSession, boolean z) {
        String trackingUrl;
        IClusterManager clusterManager = this.atomicBuildEnv.get().clusterManager();
        String applicationId = sparkSession.sparkContext().applicationId();
        HashMap hashMap = new HashMap();
        hashMap.put("yarn_app_id", applicationId);
        try {
            trackingUrl = getTrackingUrl(clusterManager, sparkSession);
        } catch (Exception e) {
            logger.error("get tracking url failed!", e);
        }
        if (StringUtils.isBlank(trackingUrl)) {
            logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
            return hashMap;
        }
        if (z) {
            trackingUrl = tryReplaceHostAddress(trackingUrl);
        }
        hashMap.put("yarn_app_url", trackingUrl);
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void exchangeSparkSession() {
        exchangeSparkSession(this.atomicSparkConf.get());
    }

    protected final void execute() throws Exception {
        String param = getParam("distMetaUrl");
        this.jobId = getParam("jobId");
        this.project = getParam("project");
        if (getParam("layoutIds") != null) {
            this.layoutSize = StringUtils.split(getParam("layoutIds"), ",").length;
        }
        try {
            try {
                KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(param));
                Throwable th = null;
                try {
                    try {
                        this.config = andUnsetThreadLocalConfig.get();
                        this.report = (IJobProgressReport) ClassUtil.newInstance(this.config.getBuildJobProgressReporter());
                        this.report.initArgsParams(getParams());
                        KylinBuildEnv orCreate = KylinBuildEnv.getOrCreate(this.config);
                        this.atomicBuildEnv.set(orCreate);
                        this.infos = orCreate.buildJobInfos();
                        this.infos.recordJobId(this.jobId);
                        this.infos.recordProject(this.project);
                        this.infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", this.jobId));
                        monitorSparkMaster();
                        HadoopUtil.setCurrentConfiguration(new Configuration());
                        exchangeSparkConf(orCreate.sparkConf());
                        TimeZoneUtils.setDefaultTimeZone(this.config);
                        waiteForResource(this.atomicSparkConf.get(), orCreate);
                        logger.info("Prepare job environment");
                        prepareSparkSession();
                        this.ss = getSparkSession();
                        if (!this.ss.conf().get("spark.master", "").equals("local")) {
                            ((EnviromentAdaptor) ClassUtil.newInstance(this.config.getBuildJobEnviromentAdaptor())).prepareEnviroment(this.ss, this.params);
                        }
                        if (this.config.useDynamicS3RoleCredentialInTable()) {
                            NTableMetadataManager nTableMetadataManager = NTableMetadataManager.getInstance(this.config, this.project);
                            nTableMetadataManager.listAllTables().forEach(tableDesc -> {
                                SparderEnv.addS3Credential(nTableMetadataManager.getOrCreateTableExt(tableDesc).getS3RoleCredentialInfo(), this.ss);
                            });
                        }
                        if (!this.config.isUTEnv()) {
                            Unsafe.setProperty("kylin.env", this.config.getDeployEnv());
                        }
                        logger.info("Start job");
                        this.infos.startJob();
                        extraInit();
                        waiteForResourceSuccess();
                        doExecute();
                        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(this.config);
                        KylinConfig createKylinConfig = KylinConfig.createKylinConfig(this.config);
                        createKylinConfig.setMetadataUrl(getParam("outputMetaUrl"));
                        MetadataStore.createMetadataStore(createKylinConfig).dump(kylinMetaStore);
                        if (andUnsetThreadLocalConfig != null) {
                            if (0 != 0) {
                                try {
                                    andUnsetThreadLocalConfig.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                andUnsetThreadLocalConfig.close();
                            }
                        }
                        if (this.infos != null) {
                            this.infos.jobEnd();
                        }
                        destroySparkSession();
                        extraDestroy();
                        executeFinish();
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (andUnsetThreadLocalConfig != null) {
                        if (th != null) {
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            andUnsetThreadLocalConfig.close();
                        }
                    }
                    throw th4;
                }
            } catch (Exception e) {
                handleException(e);
                if (this.infos != null) {
                    this.infos.jobEnd();
                }
                destroySparkSession();
                extraDestroy();
                executeFinish();
            }
        } catch (Throwable th6) {
            if (this.infos != null) {
                this.infos.jobEnd();
            }
            destroySparkSession();
            extraDestroy();
            executeFinish();
            throw th6;
        }
    }

    protected void handleException(Exception exc) throws Exception {
        if (exc instanceof AccessControlException) {
            interceptAccessControlException(exc);
        }
        if ((exc instanceof RuntimeException) && (exc.getCause() instanceof AccessControlException)) {
            interceptAccessControlException(exc.getCause());
        } else if ((exc instanceof RuntimeException) && (exc.getCause() instanceof SparkException) && (extractRealRootCauseFromSparkException(exc) instanceof AccessControlException)) {
            interceptAccessControlException(exc);
        }
        throw exc;
    }

    protected Throwable extractRealRootCauseFromSparkException(Exception exc) {
        Throwable cause = exc.getCause();
        while (true) {
            Throwable th = cause;
            if (!(th instanceof SparkException)) {
                return th;
            }
            cause = th.getCause();
        }
    }

    protected void interceptAccessControlException(Throwable th) throws NoRetryException {
        logger.error("Permission denied.", th);
        throw new NoRetryException("Permission denied.");
    }

    private SparkSession createSpark(SparkConf sparkConf) {
        SparkSession.Builder config = SparkSession.builder().withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { // from class: org.apache.kylin.engine.spark.application.SparkApplication.1
            public BoxedUnit apply(SparkSessionExtensions sparkSessionExtensions) {
                sparkSessionExtensions.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() { // from class: org.apache.kylin.engine.spark.application.SparkApplication.1.1
                    public Rule<LogicalPlan> apply(SparkSession sparkSession) {
                        return new AlignmentTableStats(sparkSession);
                    }
                });
                return BoxedUnit.UNIT;
            }
        }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
        boolean z = !isJobOnCluster(sparkConf) && SparderEnv.isSparkAvailable();
        if (z) {
            z = !(SparderEnv.getSparkSession() instanceof KylinSession);
        }
        return z ? config.getOrCreate() : KylinSession$.MODULE$.KylinBuilder(config).buildCluster().getOrCreateKylinSession();
    }

    public boolean isJobOnCluster(SparkConf sparkConf) {
        return (Utils.isLocalMaster(sparkConf) || this.config.isUTEnv()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void extraInit() {
        loadLogicalView();
    }

    public void extraDestroy() {
        if (this.config != null && StringUtils.isNotEmpty(this.config.getKubernetesUploadPath())) {
            logger.info("uploadPath={}", this.config.getKubernetesUploadPath());
            try {
                HDFSUtils.deleteMarkFile(this.config.getKubernetesUploadPath());
            } catch (Exception e) {
                logger.warn("Failed to delete " + this.config.getKubernetesUploadPath(), e);
            }
        }
        if (this.clusterMonitor != null) {
            this.clusterMonitor.shutdown();
        }
    }

    protected abstract void doExecute() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public void onLayoutFinished(long j) {
    }

    protected void onExecuteFinished() {
    }

    protected String calculateRequiredCores() throws Exception {
        return "1";
    }

    private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        if (sparkConf.getAll() != null) {
            Arrays.stream(sparkConf.getAll()).forEach(tuple2 -> {
                sparkConfHelper.setConf((String) tuple2._1, (String) tuple2._2);
            });
        }
        sparkConfHelper.setClusterManager(KylinBuildEnv.get().clusterManager());
        chooseContentSize(sparkConfHelper);
        sparkConfHelper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(this.layoutSize));
        sparkConfHelper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores());
        sparkConfHelper.setConf(SparkConfHelper.COUNT_DISTICT, hasCountDistinct().toString());
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
    }

    private void waiteForResource(SparkConf sparkConf, KylinBuildEnv kylinBuildEnv) throws Exception {
        StageExec create = StageType.WAITE_FOR_RESOURCE.create(this, null, null);
        this.infos.recordStageId(create.getId());
        create.execute();
    }

    protected void waiteForResourceSuccess() throws Exception {
        StageType.WAITE_FOR_RESOURCE.create(this, null, null).onStageFinished(ExecutableState.SUCCEED);
        this.infos.recordStageId("");
    }

    protected void executeFinish() {
        try {
            getReport().executeFinish(getReportParams(), this.project, getJobId());
        } catch (Exception e) {
            logger.error("executeFinish failed", e);
        }
    }

    protected void chooseContentSize(SparkConfHelper sparkConfHelper) {
        sparkConfHelper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, chooseContentSize(this.config.getJobTmpShareDir(this.project, this.jobId)));
    }

    protected boolean checkRangePartitionTableIsExist(NDataModel nDataModel) {
        return nDataModel.getAllTableRefs().stream().anyMatch(tableRef -> {
            return tableRef.getTableDesc().isRangePartition();
        });
    }

    protected String chooseContentSize(Path path) {
        return ResourceDetectUtils.getMaxResourceSize(path) + "b";
    }

    protected Boolean hasCountDistinct() throws IOException {
        Boolean bool;
        Path path = new Path(this.config.getJobTmpShareDir(this.project, this.jobId), ResourceDetectUtils.countDistinctSuffix());
        if (HadoopUtil.getWorkingFileSystem().exists(path)) {
            bool = (Boolean) ResourceDetectUtils.readResourcePathsAs(path);
        } else {
            bool = false;
            logger.info("File count_distinct.json doesn't exist, set hasCountDistinct to false.");
        }
        logger.info("Exist count distinct measure: {}", bool);
        return bool;
    }

    public void logJobInfo() {
        try {
            logger.info(generateInfo());
            if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) {
                logger.info("skip record job wait and run time");
                return;
            }
            HashMap hashMap = new HashMap();
            hashMap.put("yarn_job_wait_time", Long.valueOf(KylinBuildEnv.get().buildJobInfos().waitTime()).toString());
            hashMap.put("yarn_job_run_time", Long.valueOf(KylinBuildEnv.get().buildJobInfos().buildTime()).toString());
            getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/wait_and_run_time", this.project, this.jobId, hashMap);
        } catch (Exception e) {
            logger.warn("Error occurred when generate job info.", e);
        }
    }

    private Map<String, String> getReportParams() {
        HashMap hashMap = new HashMap();
        hashMap.put("time_out", String.valueOf(this.config.getUpdateJobInfoTimeout()));
        hashMap.put("job_tmp_dir", this.config.getJobTmpDir(this.project, true));
        return hashMap;
    }

    protected String generateInfo() {
        return LogJobInfoUtils.sparkApplicationInfo();
    }

    public Set<String> getIgnoredSnapshotTables() {
        return NSparkCubingUtil.toIgnoredTableSet(getParam("ignoredSnapshotTables"));
    }

    protected Map<String, String> getSparkConfigOverride(KylinConfig kylinConfig) {
        return kylinConfig.getSparkConfigOverride();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkDateFormatIfExist(String str, String str2) throws Exception {
        if (this.config.isUTEnv()) {
            return;
        }
        NDataModel dataModelDesc = NDataModelManager.getInstance(this.config, str).getDataModelDesc(str2);
        if (checkRangePartitionTableIsExist(dataModelDesc)) {
            logger.info("Range partitioned tables do not support pushdown, so do not need to perform subsequent logic");
            return;
        }
        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
        if (PartitionDesc.isEmptyPartitionDesc(partitionDesc) || StringUtils.isEmpty(partitionDesc.getPartitionDateFormat()) || CatalogTableType.VIEW().name().equals(dataModelDesc.getRootFactTable().getTableDesc().getTableType())) {
            return;
        }
        String backTickExp = dataModelDesc.getPartitionDesc().getPartitionDateColumnRef().getBackTickExp();
        try {
            SparkSubmitter.OverriddenSparkSession overrideSparkSession = SparkSubmitter.getInstance().overrideSparkSession(this.atomicSparkSession.get());
            Throwable th = null;
            try {
                try {
                    String probeColFormat = PushDownUtil.probeColFormat(dataModelDesc.getRootFactTableName(), backTickExp, str);
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dataModelDesc.getPartitionDesc().getPartitionDateFormat(), Locale.getDefault(Locale.Category.FORMAT));
                    Date parse = simpleDateFormat.parse(probeColFormat);
                    if (parse == null || !probeColFormat.equals(simpleDateFormat.format(parse))) {
                        throw new NoRetryException("date format not match");
                    }
                    if (overrideSparkSession != null) {
                        if (0 != 0) {
                            try {
                                overrideSparkSession.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            overrideSparkSession.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (overrideSparkSession != null) {
                    if (th != null) {
                        try {
                            overrideSparkSession.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        overrideSparkSession.close();
                    }
                }
                throw th4;
            }
        } catch (ParseException | NoRetryException e) {
            throw new NoRetryException("date format not match");
        } catch (KylinException e2) {
        }
    }

    @VisibleForTesting
    void exchangeSparkConf(SparkConf sparkConf) throws Exception {
        if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
            Map<String, String> sparkConfigOverride = getSparkConfigOverride(this.config);
            if (!sparkConfigOverride.isEmpty()) {
                sparkConf.getClass();
                sparkConfigOverride.forEach(sparkConf::set);
                logger.info("Override user-defined spark conf: {}", JsonUtil.writeValueAsString(sparkConfigOverride));
            }
            if (this.config.isAutoSetSparkConf()) {
                logger.info("Set spark conf automatically.");
                try {
                    autoSetSparkConf(sparkConf);
                } catch (Exception e) {
                    logger.warn("Auto set spark conf failed. Load spark conf from system properties", e);
                }
            }
        }
        boolean z = sparkConf.getBoolean("spark.eventLog.enabled", false);
        String str = sparkConf.get("spark.eventLog.dir", "");
        if (z && !str.isEmpty()) {
            Path path = new Path(new URI(str).getPath());
            FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
            if (!workingFileSystem.exists(path)) {
                workingFileSystem.mkdirs(path);
            }
        }
        this.atomicSparkConf.set(sparkConf);
    }

    private void exchangeSparkSession(SparkConf sparkConf) {
        if (Objects.nonNull(this.atomicSparkSession.get())) {
            destroySparkSession();
        }
        SparkSession createSpark = createSpark(sparkConf);
        if (!this.config.isUTEnv() && !sparkConf.get("spark.master").startsWith("k8s")) {
            getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/spark", this.project, this.jobId, getTrackingInfo(createSpark, this.config.isTrackingUrlIpAddressEnabled()));
        }
        JobMetricsUtils.registerListener(createSpark);
        SparderEnv.registerListener(createSpark.sparkContext());
        SparderEnv.setSparkSession(createSpark);
        UdfManager.create(createSpark);
        this.atomicSparkSession.set(createSpark);
    }

    private void prepareSparkSession() throws NoRetryException {
        SparkConf sparkConf = this.atomicSparkConf.get();
        if (Objects.isNull(sparkConf)) {
            throw new NoRetryException("spark conf shouldn't be null");
        }
        if (!this.config.isSnapshotSpecifiedSparkConf() || !(this instanceof SegmentBuildJob)) {
            exchangeSparkSession(sparkConf);
            return;
        }
        SparkConf clone = sparkConf.clone();
        Map snapshotBuildingConfigOverride = this.config.getSnapshotBuildingConfigOverride();
        clone.getClass();
        snapshotBuildingConfigOverride.forEach(clone::set);
        logger.info("exchange sparkSession using snapshot specified sparkConf");
        exchangeSparkSession(clone);
    }

    private void destroySparkSession() {
        SparkSession sparkSession = this.atomicSparkSession.get();
        if (Objects.isNull(sparkSession)) {
            logger.info("no initialized sparkSession instance");
        } else {
            if (sparkSession.conf().get("spark.master").startsWith("local")) {
                return;
            }
            JobMetricsUtils.unRegisterListener(sparkSession);
            sparkSession.stop();
        }
    }

    private void monitorSparkMaster() {
        this.clusterMonitor = new ClusterMonitor();
        this.clusterMonitor.monitorSparkMaster(this.atomicBuildEnv, this.atomicSparkSession, this.atomicDisconnectSparkMasterTimes, this.atomicUnreachableSparkMaster);
    }

    @VisibleForTesting
    public void loadLogicalView() {
        LogicalView findLogicalViewInProject;
        if (this.config.isDDLLogicalViewEnabled()) {
            String param = getParam("dataflowId");
            String param2 = getParam("table");
            LogicalViewManager logicalViewManager = LogicalViewManager.getInstance(this.config);
            if (StringUtils.isNotBlank(param)) {
                logicalViewManager.findLogicalViewsInModel(this.project, param).forEach(logicalView -> {
                    LogicalViewLoader.loadView(logicalView.getTableName(), true, this.ss);
                });
            }
            if (!StringUtils.isNotBlank(param2) || (findLogicalViewInProject = logicalViewManager.findLogicalViewInProject(getProject(), param2)) == null) {
                return;
            }
            LogicalViewLoader.loadView(findLogicalViewInProject.getTableName(), true, this.ss);
        }
    }
}
