package co.cask.cdap.datastreams;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.LocationAwareMDCWrapperLogger;
import co.cask.cdap.etl.proto.v2.DataStreamsConfig;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/datastreams/DataStreamsSparkLauncher.class */
public class DataStreamsSparkLauncher extends AbstractSpark {
    static final String IS_UNIT_TEST = "hydrator.is.unit.test";
    static final String NUM_SOURCES = "hydrator.num.sources";
    static final String EXTRA_OPTS = "hydrator.extra.opts";
    static final String CHECKPOINT_DIR = "hydrator.checkpoint.dir";
    static final String CHECKPOINTS_DISABLED = "hydrator.checkpoints.disabled";
    public static final String NAME = "DataStreamsSparkStreaming";
    private final DataStreamsPipelineSpec pipelineSpec;
    private final DataStreamsConfig config;
    private static final Logger LOG = LoggerFactory.getLogger(DataStreamsSparkLauncher.class);
    private static final Logger WRAPPERLOGGER = new LocationAwareMDCWrapperLogger(LOG, Constants.EVENT_TYPE_TAG, Constants.PIPELINE_LIFECYCLE_TAG_VALUE);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();

    public DataStreamsSparkLauncher(DataStreamsPipelineSpec dataStreamsPipelineSpec, DataStreamsConfig dataStreamsConfig) {
        this.pipelineSpec = dataStreamsPipelineSpec;
        this.config = dataStreamsConfig;
    }

    @Override // co.cask.cdap.api.spark.AbstractSpark
    protected void configure() {
        setName(NAME);
        setMainClass(SparkStreamingPipelineDriver.class);
        setExecutorResources(this.pipelineSpec.getResources());
        setDriverResources(this.pipelineSpec.getDriverResources());
        setClientResources(this.pipelineSpec.getClientResources());
        int i = 0;
        Iterator<StageSpec> it = this.pipelineSpec.getStages().iterator();
        while (it.hasNext()) {
            if (StreamingSource.PLUGIN_TYPE.equals(it.next().getPlugin().getType())) {
                i++;
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.pipelineSpec));
        hashMap.put(IS_UNIT_TEST, String.valueOf(this.config.isUnitTest()));
        hashMap.put(NUM_SOURCES, String.valueOf(i));
        hashMap.put(EXTRA_OPTS, this.pipelineSpec.getExtraJavaOpts());
        hashMap.put(CHECKPOINT_DIR, this.config.getCheckpointDir() == null ? UUID.randomUUID().toString() : this.config.getCheckpointDir());
        hashMap.put(CHECKPOINTS_DISABLED, String.valueOf(this.config.checkpointsDisabled()));
        setProperties(hashMap);
    }

    @Override // co.cask.cdap.api.spark.AbstractSpark
    public void initialize() throws Exception {
        SparkClientContext context = getContext();
        WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}", context.getApplicationSpecification().getName(), UserGroupInformation.getCurrentUser().getShortUserName(), Joiner.on(", ").withKeyValueSeparator("=").join((Map<?, ?>) context.getRuntimeArguments()));
        SparkConf sparkConf = new SparkConf();
        Map<String, String> properties = context.getSpecification().getProperties();
        String str = properties.get(EXTRA_OPTS);
        if (str != null && !str.isEmpty()) {
            sparkConf.set("spark.driver.extraJavaOptions", str);
            sparkConf.set("spark.executor.extraJavaOptions", str);
        }
        Integer valueOf = Integer.valueOf(properties.get(NUM_SOURCES));
        sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(valueOf.intValue() + 2));
        if (Boolean.valueOf(properties.get(IS_UNIT_TEST)).booleanValue()) {
            sparkConf.setMaster(String.format("local[%d]", Integer.valueOf(valueOf.intValue() + 1)));
        }
        context.setSparkConf(sparkConf);
        if (!Boolean.valueOf(properties.get(CHECKPOINTS_DISABLED)).booleanValue()) {
            FileSet fileSet = (FileSet) context.getDataset(DataStreamsApp.CHECKPOINT_FILESET);
            String name = context.getApplicationSpecification().getName();
            String property = context.getSpecification().getProperty(CHECKPOINT_DIR);
            Location append = fileSet.getBaseLocation().append(name);
            Location append2 = append.append(property);
            if (!ensureDirExists(append)) {
                throw new IOException(String.format("Unable to create checkpoint base directory '%s' for the pipeline.", append));
            }
            try {
                for (Location location : append.list()) {
                    if (!location.equals(append2) && !location.delete(true)) {
                        LOG.warn("Unable to delete checkpoint directory {} from an old pipeline.", location);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Unable to clean up old checkpoint directories from old pipelines.", (Throwable) e);
            }
            if (!ensureDirExists(append2)) {
                throw new IOException(String.format("Unable to create checkpoint directory '%s' for the pipeline.", append2));
            }
        }
        WRAPPERLOGGER.info("Pipeline '{}' running", context.getApplicationSpecification().getName());
    }

    @Override // co.cask.cdap.api.spark.AbstractSpark, co.cask.cdap.api.ProgramLifecycle
    public void destroy() {
        super.destroy();
        ProgramStatus status = getContext().getState().getStatus();
        WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(), status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
    }

    private boolean ensureDirExists(Location location) throws IOException {
        return location.isDirectory() || location.mkdirs() || location.isDirectory();
    }
}
