package co.cask.cdap.datastreams;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.spark.SparkConf;

/* loaded from: input_file:co/cask/cdap/datastreams/DataStreamsSparkLauncher.class */
public class DataStreamsSparkLauncher extends AbstractSpark {
    public static final String NAME = "DataStreamsSparkStreaming";
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final DataStreamsPipelineSpec pipelineSpec;
    private final boolean isUnitTest;

    public DataStreamsSparkLauncher(DataStreamsPipelineSpec dataStreamsPipelineSpec, boolean z) {
        this.pipelineSpec = dataStreamsPipelineSpec;
        this.isUnitTest = z;
    }

    @Override // co.cask.cdap.api.spark.AbstractSpark
    protected void configure() {
        setName(NAME);
        setMainClass(SparkStreamingPipelineDriver.class);
        setExecutorResources(this.pipelineSpec.getResources());
        setDriverResources(this.pipelineSpec.getDriverResources());
        int i = 0;
        Iterator<StageSpec> it = this.pipelineSpec.getStages().iterator();
        while (it.hasNext()) {
            if (StreamingSource.PLUGIN_TYPE.equals(it.next().getPlugin().getType())) {
                i++;
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.pipelineSpec));
        hashMap.put("cask.hydrator.is.unit.test", String.valueOf(this.isUnitTest));
        hashMap.put("cask.hydrator.num.sources", String.valueOf(i));
        hashMap.put("cask.hydrator.extra.opts", this.pipelineSpec.getExtraJavaOpts());
        setProperties(hashMap);
    }

    @Override // co.cask.cdap.api.spark.AbstractSpark
    public void initialize() throws Exception {
        SparkClientContext context = getContext();
        SparkConf sparkConf = new SparkConf();
        Map<String, String> properties = context.getSpecification().getProperties();
        String str = properties.get("cask.hydrator.extra.opts");
        if (str != null && !str.isEmpty()) {
            sparkConf.set("spark.driver.extraJavaOptions", str);
            sparkConf.set("spark.executor.extraJavaOptions", str);
        }
        Integer valueOf = Integer.valueOf(properties.get("cask.hydrator.num.sources"));
        sparkConf.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(valueOf.intValue() + 2));
        if (Boolean.valueOf(properties.get("cask.hydrator.is.unit.test")).booleanValue()) {
            sparkConf.setMaster(String.format("local[%d]", Integer.valueOf(valueOf.intValue() + 1)));
        }
        context.setSparkConf(sparkConf);
    }
}
