package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.ServiceDiscoverer;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsCollector;
import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.services.SerializableServiceDiscoverer;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.internal.app.runtime.spark.metrics.SparkUserMetrics;
import co.cask.cdap.logging.context.SparkLoggingContext;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.TransactionAware;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/BasicSparkContext.class */
public class BasicSparkContext extends AbstractContext implements SparkContext {
    private static final Logger LOG = LoggerFactory.getLogger(BasicSparkContext.class);
    private static final Pattern SPACES = Pattern.compile("\\s+");
    private static final String[] NO_ARGS = new String[0];
    private File metricsPropertyFile;
    private final SparkSpecification sparkSpec;
    private final long logicalStartTime;
    private final String workflowBatch;
    private final StreamAdmin streamAdmin;
    private final SparkLoggingContext loggingContext;
    private final SerializableServiceDiscoverer serializableServiceDiscoverer;
    private final SparkUserMetrics userMetrics;

    public void setMetricsPropertyFile(File file) {
        this.metricsPropertyFile = file;
    }

    public File getMetricsPropertyFile() {
        return this.metricsPropertyFile;
    }

    public BasicSparkContext(Program program, RunId runId, Arguments arguments, Set<String> set, SparkSpecification sparkSpecification, long j, String str, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient, StreamAdmin streamAdmin) {
        super(program, runId, arguments, set, getMetricCollector(metricsCollectionService, program, runId.getId()), datasetFramework, discoveryServiceClient);
        this.logicalStartTime = j;
        this.workflowBatch = str;
        this.streamAdmin = streamAdmin;
        SerializableServiceDiscoverer.setDiscoveryServiceClient(getDiscoveryServiceClient());
        this.serializableServiceDiscoverer = new SerializableServiceDiscoverer(getProgram());
        SparkUserMetrics.setMetricsCollector(getProgramMetrics());
        this.userMetrics = new SparkUserMetrics();
        this.loggingContext = new SparkLoggingContext(getNamespaceId(), getApplicationId(), getProgramName(), getRunId().getId());
        this.sparkSpec = sparkSpecification;
    }

    public SerializableServiceDiscoverer getSerializableServiceDiscoverer() {
        return this.serializableServiceDiscoverer;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public String toString() {
        return String.format("Job=%s: %s, %s", ProgramType.SPARK.name().toLowerCase(), this.sparkSpec.getName(), super.toString());
    }

    public SparkSpecification getSpecification() {
        return this.sparkSpec;
    }

    public long getLogicalStartTime() {
        return this.logicalStartTime;
    }

    public <T> T readFromDataset(String str, Class<?> cls, Class<?> cls2) {
        throw new IllegalStateException("Reading dataset is not supported here");
    }

    public <T> void writeToDataset(T t, String str, Class<?> cls, Class<?> cls2) {
        throw new IllegalStateException("Writing  dataset is not supported here");
    }

    public <T> T readFromStream(String str, Class<?> cls) {
        throw new IllegalStateException("Reading stream is not supported here");
    }

    public <T> T readFromStream(String str, Class<?> cls, long j, long j2) {
        throw new IllegalStateException("Reading stream is not supported here");
    }

    public <T> T readFromStream(String str, Class<?> cls, long j, long j2, Class<? extends StreamEventDecoder> cls2) {
        throw new IllegalStateException("Reading stream is not supported here");
    }

    private static MetricsCollector getMetricCollector(MetricsCollectionService metricsCollectionService, Program program, String str) {
        if (metricsCollectionService == null) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap(getMetricsContext(program, str));
        newHashMap.put("ins", "0");
        return metricsCollectionService.getCollector(newHashMap);
    }

    public <T> T getOriginalSparkContext() {
        throw new IllegalStateException("Getting base Spark Context is not supported here");
    }

    public String[] getRuntimeArguments(String str) {
        if (getRuntimeArguments().containsKey(str)) {
            return SPACES.split(getRuntimeArguments().get(str).trim());
        }
        LOG.warn("Argument with key {} not found in Runtime Arguments", str);
        return NO_ARGS;
    }

    public ServiceDiscoverer getServiceDiscoverer() {
        throw new IllegalStateException("Service Discovery is not supported in this Context");
    }

    public StreamAdmin getStreamAdmin() {
        return this.streamAdmin;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Metrics getMetrics() {
        return this.userMetrics;
    }

    public LoggingContext getLoggingContext() {
        return this.loggingContext;
    }

    public void flushOperations() throws Exception {
        Iterator it = getDatasetInstantiator().getTransactionAware().iterator();
        while (it.hasNext()) {
            ((TransactionAware) it.next()).commitTx();
        }
    }
}
