package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.data.batch.BatchReadable;
import co.cask.cdap.api.data.batch.BatchWritable;
import co.cask.cdap.api.data.batch.DatasetOutputCommitter;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.batch.SplitReader;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data.dataset.DatasetInstantiator;
import co.cask.cdap.data.stream.StreamInputFormat;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.internal.app.runtime.spark.dataset.CloseableBatchWritable;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetInputFormat;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetOutputFormat;
import co.cask.cdap.proto.Id;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/ExecutionSparkContext.class */
public class ExecutionSparkContext extends AbstractSparkContext {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionSparkContext.class);
    private final Map<String, Dataset> datasets;
    private final Configuration hConf;
    private final Transaction transaction;
    private final StreamAdmin streamAdmin;
    private final DatasetInstantiator datasetInstantiator;
    private boolean stopped;
    private SparkFacade sparkFacade;

    public ExecutionSparkContext(ApplicationSpecification applicationSpecification, SparkSpecification sparkSpecification, Id.Program program, RunId runId, ClassLoader classLoader, long j, Map<String, String> map, Transaction transaction, DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient, MetricsCollectionService metricsCollectionService, Configuration configuration, StreamAdmin streamAdmin, @Nullable WorkflowToken workflowToken) {
        this(applicationSpecification, sparkSpecification, program, runId, classLoader, j, map, transaction, datasetFramework, discoveryServiceClient, createMetricsContext(metricsCollectionService, program, runId), createLoggingContext(program, runId), configuration, streamAdmin, workflowToken);
    }

    public ExecutionSparkContext(ApplicationSpecification applicationSpecification, SparkSpecification sparkSpecification, Id.Program program, RunId runId, ClassLoader classLoader, long j, Map<String, String> map, Transaction transaction, DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient, MetricsContext metricsContext, LoggingContext loggingContext, Configuration configuration, StreamAdmin streamAdmin, WorkflowToken workflowToken) {
        super(applicationSpecification, sparkSpecification, program, runId, classLoader, j, map, discoveryServiceClient, metricsContext, loggingContext, workflowToken);
        this.datasets = new HashMap();
        this.hConf = configuration;
        this.transaction = transaction;
        this.streamAdmin = streamAdmin;
        this.datasetInstantiator = new DatasetInstantiator(program.getNamespace(), datasetFramework, classLoader, getOwners(), getMetricsContext());
    }

    public <T> T readFromDataset(String str, Class<?> cls, Class<?> cls2, Map<String, String> map) {
        Configuration configuration = new Configuration(this.hConf);
        Map<String, String> extractScope = RuntimeArguments.extractScope(Scope.DATASET, str, getRuntimeArguments());
        extractScope.putAll(map);
        InputFormatProvider instantiateDataset = instantiateDataset(str, extractScope);
        try {
            if (!(instantiateDataset instanceof InputFormatProvider)) {
                commitAndClose(str, instantiateDataset);
                SparkDatasetInputFormat.setDataset(configuration, str, extractScope);
                return (T) getSparkFacade().createRDD(SparkDatasetInputFormat.class, cls, cls2, configuration);
            }
            String inputFormatClassName = instantiateDataset.getInputFormatClassName();
            if (inputFormatClassName == null) {
                throw new DatasetInstantiationException(String.format("Dataset '%s' provided null as the input format class name", str));
            }
            try {
                Class loadClass = SparkClassLoader.findFromContext().loadClass(inputFormatClassName);
                Map inputFormatConfiguration = instantiateDataset.getInputFormatConfiguration();
                if (inputFormatConfiguration != null) {
                    for (Map.Entry entry : inputFormatConfiguration.entrySet()) {
                        configuration.set((String) entry.getKey(), (String) entry.getValue());
                    }
                }
                T t = (T) getSparkFacade().createRDD(loadClass, cls, cls2, configuration);
                commitAndClose(str, instantiateDataset);
                return t;
            } catch (ClassCastException e) {
                throw new DatasetInstantiationException(String.format("Input format class %s provided by dataset '%s' is not an input format", inputFormatClassName, str), e);
            } catch (ClassNotFoundException e2) {
                throw new DatasetInstantiationException(String.format("Cannot load input format class %s provided by dataset '%s'", inputFormatClassName, str), e2);
            }
        } catch (Throwable th) {
            commitAndClose(str, instantiateDataset);
            throw th;
        }
    }

    public <T> void writeToDataset(T t, String str, Class<?> cls, Class<?> cls2, Map<String, String> map) {
        Configuration configuration = new Configuration(this.hConf);
        Map<String, String> extractScope = RuntimeArguments.extractScope(Scope.DATASET, str, getRuntimeArguments());
        extractScope.putAll(map);
        DatasetOutputCommitter instantiateDataset = instantiateDataset(str, extractScope);
        try {
            if (!(instantiateDataset instanceof OutputFormatProvider)) {
                commitAndClose(str, instantiateDataset);
                SparkDatasetOutputFormat.setDataset(this.hConf, str, extractScope);
                getSparkFacade().saveAsDataset(t, SparkDatasetOutputFormat.class, cls, cls2, new Configuration(this.hConf));
                return;
            }
            String outputFormatClassName = ((OutputFormatProvider) instantiateDataset).getOutputFormatClassName();
            if (outputFormatClassName == null) {
                throw new DatasetInstantiationException(String.format("Dataset '%s' provided null as the output format class name", str));
            }
            try {
                try {
                    Class loadClass = SparkClassLoader.findFromContext().loadClass(outputFormatClassName);
                    Map outputFormatConfiguration = ((OutputFormatProvider) instantiateDataset).getOutputFormatConfiguration();
                    if (outputFormatConfiguration != null) {
                        for (Map.Entry entry : outputFormatConfiguration.entrySet()) {
                            configuration.set((String) entry.getKey(), (String) entry.getValue());
                        }
                    }
                    try {
                        getSparkFacade().saveAsDataset(t, loadClass, cls, cls2, configuration);
                        if (instantiateDataset instanceof DatasetOutputCommitter) {
                            instantiateDataset.onSuccess();
                        }
                    } catch (Throwable th) {
                        if (instantiateDataset instanceof DatasetOutputCommitter) {
                            instantiateDataset.onFailure();
                        }
                        throw th;
                    }
                } catch (ClassCastException e) {
                    throw new DatasetInstantiationException(String.format("Input format class %s provided by dataset '%s' is not an input format", outputFormatClassName, str), e);
                }
            } catch (ClassNotFoundException e2) {
                throw new DatasetInstantiationException(String.format("Cannot load input format class %s provided by dataset '%s'", outputFormatClassName, str), e2);
            }
        } finally {
            commitAndClose(str, instantiateDataset);
        }
    }

    public <T> T readFromStream(String str, Class<?> cls, long j, long j2, Class<? extends StreamEventDecoder> cls2) {
        try {
            T t = (T) getSparkFacade().createRDD(StreamInputFormat.class, LongWritable.class, cls, configureStreamInput(new Configuration(this.hConf), cls2 == null ? new StreamBatchReadable(str, j, j2) : new StreamBatchReadable(str, j, j2, cls2), cls));
            Id.Stream from = Id.Stream.from(getProgramId().getNamespace(), str);
            try {
                this.streamAdmin.register(getOwners(), from);
                this.streamAdmin.addAccess(new Id.Run(getProgramId(), getRunId().getId()), from, AccessType.READ);
            } catch (Exception e) {
                LOG.warn("Failed to registry usage of {} -> {}", new Object[]{from, getOwners(), e});
            }
            return t;
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public <T> T getOriginalSparkContext() {
        return (T) getSparkFacade().getContext();
    }

    public synchronized <T extends Dataset> T getDataset(String str, Map<String, String> map) {
        Map<String, String> extractScope = RuntimeArguments.extractScope(Scope.DATASET, str, getRuntimeArguments());
        extractScope.putAll(map);
        String str2 = str + ImmutableSortedMap.copyOf(extractScope).toString();
        Dataset dataset = this.datasets.get(str2);
        if (dataset == null) {
            dataset = instantiateDataset(str, extractScope);
            this.datasets.put(str2, dataset);
        }
        return (T) dataset;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        if (this.sparkFacade != null) {
            this.sparkFacade.stop();
        }
        Iterator<Dataset> it = this.datasets.values().iterator();
        while (it.hasNext()) {
            Closeables.closeQuietly(it.next());
        }
    }

    public void flushDatasets() throws Exception {
        Iterator<Dataset> it = this.datasets.values().iterator();
        while (it.hasNext()) {
            TransactionAware transactionAware = (Dataset) it.next();
            if (transactionAware instanceof TransactionAware) {
                transactionAware.commitTx();
            }
        }
    }

    public synchronized boolean isStopped() {
        return this.stopped;
    }

    public <K, V> BatchReadable<K, V> getBatchReadable(final String str, Map<String, String> map) {
        final BatchReadable instantiateDataset = instantiateDataset(str, map);
        Preconditions.checkArgument(instantiateDataset instanceof BatchReadable, "Dataset %s of type %s does not implements %s", new Object[]{str, instantiateDataset.getClass().getName(), BatchReadable.class.getName()});
        final BatchReadable batchReadable = instantiateDataset;
        return new BatchReadable<K, V>() { // from class: co.cask.cdap.internal.app.runtime.spark.ExecutionSparkContext.1
            public List<Split> getSplits() {
                try {
                    List<Split> splits = batchReadable.getSplits();
                    ExecutionSparkContext.this.commitAndClose(str, instantiateDataset);
                    return splits;
                } catch (Throwable th) {
                    ExecutionSparkContext.this.commitAndClose(str, instantiateDataset);
                    throw th;
                }
            }

            public SplitReader<K, V> createSplitReader(Split split) {
                return new ForwardingSplitReader<K, V>(batchReadable.createSplitReader(split)) { // from class: co.cask.cdap.internal.app.runtime.spark.ExecutionSparkContext.1.1
                    @Override // co.cask.cdap.internal.app.runtime.spark.ForwardingSplitReader
                    public void close() {
                        try {
                            super.close();
                            ExecutionSparkContext.this.commitAndClose(str, instantiateDataset);
                        } catch (Throwable th) {
                            ExecutionSparkContext.this.commitAndClose(str, instantiateDataset);
                            throw th;
                        }
                    }
                };
            }
        };
    }

    public <K, V> CloseableBatchWritable<K, V> getBatchWritable(final String str, Map<String, String> map) {
        final BatchWritable instantiateDataset = instantiateDataset(str, map);
        Preconditions.checkArgument(instantiateDataset instanceof BatchWritable, "Dataset %s of type %s does not implements %s", new Object[]{str, instantiateDataset.getClass().getName(), BatchWritable.class.getName()});
        final BatchWritable batchWritable = instantiateDataset;
        return new CloseableBatchWritable<K, V>() { // from class: co.cask.cdap.internal.app.runtime.spark.ExecutionSparkContext.2
            public void write(K k, V v) {
                batchWritable.write(k, v);
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                ExecutionSparkContext.this.commitAndClose(str, instantiateDataset);
            }
        };
    }

    @Override // co.cask.cdap.internal.app.runtime.spark.AbstractSparkContext
    public SparkConf getSparkConf() {
        throw new UnsupportedOperationException("getSparkConf shouldn't be called in execution context");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setSparkFacade(SparkFacade sparkFacade) {
        this.sparkFacade = sparkFacade;
        if (this.stopped) {
            sparkFacade.stop();
        }
    }

    public Transaction getTransaction() {
        return this.transaction;
    }

    private synchronized SparkFacade getSparkFacade() {
        Preconditions.checkState(this.sparkFacade != null, "Spark Context not available in the current execution context");
        return this.sparkFacade;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitAndClose(String str, Dataset dataset) {
        try {
            if (dataset instanceof TransactionAware) {
                try {
                    ((TransactionAware) dataset).commitTx();
                } catch (Exception e) {
                    LOG.error("Failed to commit dataset changes for dataset {} of type {}", new Object[]{str, dataset.getClass(), e});
                    throw Throwables.propagate(e);
                }
            }
        } finally {
            Closeables.closeQuietly(dataset);
        }
    }

    private Configuration configureStreamInput(Configuration configuration, StreamBatchReadable streamBatchReadable, Class<?> cls) throws IOException {
        StreamConfig config = this.streamAdmin.getConfig(Id.Stream.from(getProgramId().getNamespace(), streamBatchReadable.getStreamName()));
        Location createGenerationLocation = StreamUtils.createGenerationLocation(config.getLocation(), StreamUtils.getGeneration(config));
        StreamInputFormat.setTTL(configuration, config.getTTL());
        StreamInputFormat.setStreamPath(configuration, Locations.toURI(createGenerationLocation));
        StreamInputFormat.setTimeRange(configuration, streamBatchReadable.getStartTime(), streamBatchReadable.getEndTime());
        String decoderType = streamBatchReadable.getDecoderType();
        if (decoderType == null) {
            StreamInputFormat.inferDecoderClass(configuration, cls);
        } else {
            StreamInputFormat.setDecoderClassName(configuration, decoderType);
        }
        return configuration;
    }

    private <T extends Dataset> T instantiateDataset(String str, Map<String, String> map) {
        TransactionAware dataset = this.datasetInstantiator.getDataset(str, map);
        if (dataset instanceof TransactionAware) {
            dataset.startTx(this.transaction);
            this.datasetInstantiator.removeTransactionAware(dataset);
        }
        return dataset;
    }
}
