package co.cask.cdap.datastreams;

import co.cask.cdap.api.preview.DataTracer;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.JoinElement;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.RecordInfo;
import co.cask.cdap.etl.common.StageStatisticsCollector;
import co.cask.cdap.etl.spark.SparkCollection;
import co.cask.cdap.etl.spark.SparkPairCollection;
import co.cask.cdap.etl.spark.SparkPipelineRunner;
import co.cask.cdap.etl.spark.function.PluginFunctionContext;
import co.cask.cdap.etl.spark.plugin.SparkPipelinePluginContext;
import co.cask.cdap.etl.spark.streaming.DStreamCollection;
import co.cask.cdap.etl.spark.streaming.DefaultStreamingContext;
import co.cask.cdap.etl.spark.streaming.DynamicDriverContext;
import co.cask.cdap.etl.spark.streaming.PairDStreamCollection;
import co.cask.cdap.etl.spark.streaming.function.CountingTransformFunction;
import co.cask.cdap.etl.spark.streaming.function.DynamicJoinMerge;
import co.cask.cdap.etl.spark.streaming.function.DynamicJoinOn;
import co.cask.cdap.etl.spark.streaming.function.WrapOutputTransformFunction;
import co.cask.cdap.etl.spark.streaming.function.preview.LimitingFunction;
import co.cask.cdap.etl.spec.StageSpec;
import java.util.List;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/* loaded from: input_file:co/cask/cdap/datastreams/SparkStreamingPipelineRunner.class */
public class SparkStreamingPipelineRunner extends SparkPipelineRunner {
    private final JavaSparkExecutionContext sec;
    private final JavaStreamingContext streamingContext;
    private final DataStreamsPipelineSpec spec;
    private final boolean checkpointsDisabled;

    public SparkStreamingPipelineRunner(JavaSparkExecutionContext javaSparkExecutionContext, JavaStreamingContext javaStreamingContext, DataStreamsPipelineSpec dataStreamsPipelineSpec, boolean z) {
        this.sec = javaSparkExecutionContext;
        this.streamingContext = javaStreamingContext;
        this.checkpointsDisabled = z;
        this.spec = dataStreamsPipelineSpec;
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<RecordInfo<Object>> getSource(StageSpec stageSpec, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        StreamingSource streamingSource;
        if (this.checkpointsDisabled) {
            streamingSource = (StreamingSource) new PluginFunctionContext(stageSpec, this.sec, stageStatisticsCollector).createPlugin();
        } else {
            streamingSource = (StreamingSource) new SparkPipelinePluginContext(this.sec.getPluginContext(), this.sec.getMetrics(), this.spec.isStageLoggingEnabled(), this.spec.isProcessTimingEnabled()).newPluginInstance(stageSpec.getName(), new ErrorMacroEvaluator("Due to spark limitations, macro evaluation is not allowed in streaming sources when checkpointing is enabled."));
        }
        DataTracer dataTracer = this.sec.getDataTracer(stageSpec.getName());
        JavaDStream stream = streamingSource.getStream(new DefaultStreamingContext(stageSpec, this.sec, this.streamingContext));
        if (dataTracer.isEnabled()) {
            stream = stream.transform(new LimitingFunction(this.spec.getNumOfRecordsPreview()));
        }
        return new DStreamCollection(this.sec, stream.transform(new CountingTransformFunction(stageSpec.getName(), this.sec.getMetrics(), Constants.Metrics.RECORDS_OUT, dataTracer)).map(new WrapOutputTransformFunction(stageSpec.getName())));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkPairCollection<Object, Object> addJoinKey(StageSpec stageSpec, String str, SparkCollection<Object> sparkCollection, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        return new PairDStreamCollection(this.sec, ((JavaDStream) sparkCollection.getUnderlying()).transformToPair(new DynamicJoinOn(new DynamicDriverContext(stageSpec, this.sec, stageStatisticsCollector), str)));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<Object> mergeJoinResults(StageSpec stageSpec, SparkPairCollection<Object, List<JoinElement<Object>>> sparkPairCollection, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        return new DStreamCollection(this.sec, ((JavaPairDStream) sparkPairCollection.getUnderlying()).transform(new DynamicJoinMerge(new DynamicDriverContext(stageSpec, this.sec, stageStatisticsCollector))));
    }
}
