package co.cask.cdap.etl.spark.streaming;

import co.cask.cdap.api.Admin;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.InstanceConflictException;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.common.AbstractStageContext;
import co.cask.cdap.etl.planner.StageInfo;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.tephra.TransactionFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/hydrator-spark-core-4.1.0.jar:co/cask/cdap/etl/spark/streaming/DefaultStreamingContext.class */
public class DefaultStreamingContext extends AbstractStageContext implements StreamingContext {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamingContext.class);
    private static final String EXTERNAL_DATASET_TYPE = "externalDataset";
    private final JavaSparkExecutionContext sec;
    private final JavaStreamingContext jsc;
    private final Admin admin;

    public DefaultStreamingContext(StageInfo stageInfo, JavaSparkExecutionContext javaSparkExecutionContext, JavaStreamingContext javaStreamingContext) {
        super(javaSparkExecutionContext.getPluginContext(), javaSparkExecutionContext.getMetrics(), stageInfo);
        this.sec = javaSparkExecutionContext;
        this.jsc = javaStreamingContext;
        this.admin = javaSparkExecutionContext.getAdmin();
    }

    @Override // co.cask.cdap.etl.api.streaming.StreamingContext
    public JavaStreamingContext getSparkStreamingContext() {
        return this.jsc;
    }

    @Override // co.cask.cdap.etl.api.streaming.StreamingContext
    public JavaSparkExecutionContext getSparkExecutionContext() {
        return this.sec;
    }

    @Override // co.cask.cdap.etl.api.streaming.StreamingContext
    public void registerLineage(final String str) throws DatasetManagementException, TransactionFailureException {
        try {
            if (!this.admin.datasetExists(str)) {
                this.admin.createDataset(str, EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY);
            }
        } catch (InstanceConflictException e) {
            LOG.debug("Dataset with name {} already created. Hence not creating the external dataset.", str);
        }
        this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DefaultStreamingContext.1
            @Override // co.cask.cdap.api.TxRunnable
            public void run(DatasetContext datasetContext) throws Exception {
                datasetContext.getDataset(str);
            }
        });
    }

    @Override // co.cask.cdap.api.Transactional
    public void execute(TxRunnable txRunnable) throws TransactionFailureException {
        this.sec.execute(txRunnable);
    }

    @Override // co.cask.cdap.api.Transactional
    public void execute(int i, TxRunnable txRunnable) throws TransactionFailureException {
        this.sec.execute(i, txRunnable);
    }
}
