package co.cask.cdap.app.runtime.spark.submit;

import co.cask.cdap.app.runtime.spark.SparkMainWrapper;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContextConfig;
import co.cask.cdap.app.runtime.spark.SparkRuntimeEnv;
import co.cask.cdap.app.runtime.spark.distributed.SparkExecutionService;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import co.cask.cdap.proto.id.ProgramRunId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.filesystem.LocationFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/submit/DistributedSparkSubmitter.class */
public class DistributedSparkSubmitter extends AbstractSparkSubmitter {
    private final Configuration hConf;
    private final String schedulerQueueName;
    private final SparkExecutionService sparkExecutionService;

    public DistributedSparkSubmitter(Configuration configuration, LocationFactory locationFactory, String str, SparkRuntimeContext sparkRuntimeContext, @Nullable String str2) {
        this.hConf = configuration;
        this.schedulerQueueName = str2;
        ProgramRunId run = sparkRuntimeContext.getProgram().getId().run(sparkRuntimeContext.getRunId().getId());
        WorkflowProgramInfo workflowInfo = sparkRuntimeContext.getWorkflowInfo();
        this.sparkExecutionService = new SparkExecutionService(locationFactory, str, run, workflowInfo == null ? null : workflowInfo.getWorkflowToken());
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter
    protected Map<String, String> getSubmitConf() {
        HashMap hashMap = new HashMap();
        if (this.schedulerQueueName != null && !this.schedulerQueueName.isEmpty()) {
            hashMap.put("spark.yarn.queue", this.schedulerQueueName);
        }
        long j = this.hConf.getLong(SparkRuntimeContextConfig.HCONF_ATTR_CREDENTIALS_UPDATE_INTERVAL_MS, -1L);
        if (j > 0) {
            hashMap.put("spark.yarn.token.renewal.interval", Long.toString(j));
        }
        return hashMap;
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter
    protected String getMaster(Map<String, String> map) {
        return "yarn-cluster";
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter
    protected List<String> beforeSubmit() {
        Iterator it = this.hConf.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            SparkRuntimeEnv.setProperty("spark.hadoop." + ((String) entry.getKey()), this.hConf.get((String) entry.getKey()));
        }
        this.sparkExecutionService.startAndWait();
        return Collections.singletonList("--" + SparkMainWrapper.ARG_EXECUTION_SERVICE_URI() + "=" + this.sparkExecutionService.getBaseURI());
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter
    protected void triggerShutdown() {
        this.sparkExecutionService.stopAndWait();
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter
    protected void onCompleted(boolean z) {
        if (z) {
            this.sparkExecutionService.stopAndWait();
        } else {
            this.sparkExecutionService.shutdownNow();
        }
    }
}
