package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.ServiceDiscoverer;
import co.cask.cdap.api.data.batch.BatchReadable;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.data.stream.StreamInputFormat;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetInputFormat;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetOutputFormat;
import com.google.gson.Gson;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.spark.SparkConf;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/AbstractSparkContext.class */
public abstract class AbstractSparkContext implements SparkContext {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkContext.class);
    private static final Pattern SPACES = Pattern.compile("\\s+");
    private static final String[] NO_ARGS = new String[0];
    private static final String SPARK_METRICS_CONF_KEY = "spark.metrics.conf";
    private final long logicalStartTime;
    private final SparkSpecification spec;
    private final Map<String, String> runtimeArguments;
    private final BasicSparkContext basicSparkContext;
    private final Configuration hConf = loadHConf();
    private final SparkConf sparkConf = initializeSparkConf();

    public AbstractSparkContext(BasicSparkContext basicSparkContext) {
        this.basicSparkContext = basicSparkContext;
        this.logicalStartTime = basicSparkContext.getLogicalStartTime();
        this.spec = basicSparkContext.getSpecification();
        this.runtimeArguments = basicSparkContext.getRuntimeArguments();
    }

    private SparkConf initializeSparkConf() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName(this.basicSparkContext.getProgramName());
        sparkConf.set(SPARK_METRICS_CONF_KEY, this.basicSparkContext.getMetricsPropertyFile().getAbsolutePath());
        return sparkConf;
    }

    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    Configuration getHConf() {
        return this.hConf;
    }

    private Configuration loadHConf() {
        Configuration configuration = new Configuration();
        configuration.clear();
        URL resource = Thread.currentThread().getContextClassLoader().getResource("spark_hconf.xml");
        if (resource == null) {
            LOG.error("Unable to find Hadoop Configuration file {} in the submitted jar.", "spark_hconf.xml");
            throw new RuntimeException("Hadoop Configuration file not found in the supplied jar. Please include Hadoop Configuration file with name spark_hconf.xml");
        }
        configuration.addResource(resource);
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Configuration setInputDataset(String str) {
        Configuration configuration = new Configuration(getHConf());
        BatchReadable dataset = this.basicSparkContext.getDataset(str);
        if (!(dataset instanceof BatchReadable)) {
            throw new IllegalArgumentException("Failed to read dataset " + str + ". The dataset does not implement BatchReadable");
        }
        List splits = dataset.getSplits();
        configuration.setClass("mapreduce.job.inputformat.class", SparkDatasetInputFormat.class, InputFormat.class);
        configuration.set("input.dataset.name", str);
        configuration.set(SparkContextConfig.HCONF_ATTR_INPUT_SPLIT_CLASS, ((Split) splits.get(0)).getClass().getName());
        configuration.set(SparkContextConfig.HCONF_ATTR_INPUT_SPLITS, new Gson().toJson(splits));
        return configuration;
    }

    public <T> T readFromStream(String str, Class<?> cls) {
        return (T) readFromStream(str, cls, 0L, System.currentTimeMillis(), null);
    }

    public <T> T readFromStream(String str, Class<?> cls, long j, long j2) {
        return (T) readFromStream(str, cls, j, j2, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Configuration setStreamInputDataset(StreamBatchReadable streamBatchReadable, Class<?> cls) throws IOException {
        Configuration configuration = new Configuration(getHConf());
        configureStreamInput(configuration, streamBatchReadable, cls);
        return configuration;
    }

    private void configureStreamInput(Configuration configuration, StreamBatchReadable streamBatchReadable, Class<?> cls) throws IOException {
        StreamConfig config = this.basicSparkContext.getStreamAdmin().getConfig(streamBatchReadable.getStreamName());
        Location createGenerationLocation = StreamUtils.createGenerationLocation(config.getLocation(), StreamUtils.getGeneration(config));
        StreamInputFormat.setTTL(configuration, config.getTTL());
        StreamInputFormat.setStreamPath(configuration, createGenerationLocation.toURI());
        StreamInputFormat.setTimeRange(configuration, streamBatchReadable.getStartTime(), streamBatchReadable.getEndTime());
        String decoderType = streamBatchReadable.getDecoderType();
        if (decoderType == null) {
            StreamInputFormat.inferDecoderClass(configuration, cls);
        } else {
            StreamInputFormat.setDecoderClassName(configuration, decoderType);
        }
        configuration.setClass("mapreduce.job.inputformat.class", StreamInputFormat.class, InputFormat.class);
        LOG.info("Using Stream as input from {}", streamBatchReadable.toURI());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Configuration setOutputDataset(String str) {
        Configuration configuration = new Configuration(getHConf());
        configuration.set("output.dataset.name", str);
        configuration.setClass("mapreduce.job.outputformat.class", SparkDatasetOutputFormat.class, OutputFormat.class);
        return configuration;
    }

    public String[] getRuntimeArguments(String str) {
        if (this.runtimeArguments.containsKey(str)) {
            return SPACES.split(this.runtimeArguments.get(str).trim());
        }
        LOG.warn("Argument with key {} not found in Runtime Arguments", str);
        return NO_ARGS;
    }

    public SparkSpecification getSpecification() {
        return this.spec;
    }

    public long getLogicalStartTime() {
        return this.logicalStartTime;
    }

    public Map<String, String> getRuntimeArguments() {
        return this.runtimeArguments;
    }

    public ServiceDiscoverer getServiceDiscoverer() {
        return this.basicSparkContext.getSerializableServiceDiscoverer();
    }

    public Metrics getMetrics() {
        return this.basicSparkContext.getMetrics();
    }
}
