package co.cask.cdap.etl.spark.streaming;

import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.streaming.Windower;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.NoopStageStatisticsCollector;
import co.cask.cdap.etl.common.RecordInfo;
import co.cask.cdap.etl.common.StageStatisticsCollector;
import co.cask.cdap.etl.spark.Compat;
import co.cask.cdap.etl.spark.SparkCollection;
import co.cask.cdap.etl.spark.SparkPairCollection;
import co.cask.cdap.etl.spark.SparkPipelineRuntime;
import co.cask.cdap.etl.spark.batch.BasicSparkExecutionPluginContext;
import co.cask.cdap.etl.spark.streaming.function.ComputeTransformFunction;
import co.cask.cdap.etl.spark.streaming.function.CountingTransformFunction;
import co.cask.cdap.etl.spark.streaming.function.DynamicAggregatorAggregate;
import co.cask.cdap.etl.spark.streaming.function.DynamicAggregatorGroupBy;
import co.cask.cdap.etl.spark.streaming.function.DynamicSparkCompute;
import co.cask.cdap.etl.spark.streaming.function.DynamicTransform;
import co.cask.cdap.etl.spark.streaming.function.StreamingAlertPublishFunction;
import co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction;
import co.cask.cdap.etl.spark.streaming.function.StreamingSparkSinkFunction;
import co.cask.cdap.etl.spec.StageSpec;
import javax.annotation.Nullable;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-4.3.1.jar:lib/hydrator-spark-core-4.3.1.jar:co/cask/cdap/etl/spark/streaming/DStreamCollection.class
 */
/* loaded from: input_file:lib/hydrator-spark-core-4.3.1.jar:co/cask/cdap/etl/spark/streaming/DStreamCollection.class */
public class DStreamCollection<T> implements SparkCollection<T> {
    private final JavaSparkExecutionContext sec;
    private final JavaDStream<T> stream;

    public DStreamCollection(JavaSparkExecutionContext javaSparkExecutionContext, JavaDStream<T> javaDStream) {
        this.sec = javaSparkExecutionContext;
        this.stream = javaDStream;
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public JavaDStream<T> getUnderlying() {
        return this.stream;
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> cache() {
        return (SparkCollection<T>) wrap(this.stream.cache());
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> union(SparkCollection<T> sparkCollection) {
        return (SparkCollection<T>) wrap(this.stream.union((JavaDStream) sparkCollection.getUnderlying()));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<RecordInfo<Object>> transform(StageSpec stageSpec, StageStatisticsCollector stageStatisticsCollector) {
        return wrap(this.stream.transform(new DynamicTransform(new DynamicDriverContext(stageSpec, this.sec, stageStatisticsCollector), false)));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<RecordInfo<Object>> multiOutputTransform(StageSpec stageSpec, StageStatisticsCollector stageStatisticsCollector) {
        return wrap(this.stream.transform(new DynamicTransform(new DynamicDriverContext(stageSpec, this.sec, stageStatisticsCollector), true)));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <U> SparkCollection<U> flatMap(StageSpec stageSpec, FlatMapFunction<T, U> flatMapFunction) {
        return wrap(this.stream.flatMap(flatMapFunction));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <K, V> SparkPairCollection<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> pairFlatMapFunction) {
        return new PairDStreamCollection(this.sec, this.stream.flatMapToPair(pairFlatMapFunction));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<RecordInfo<Object>> aggregate(StageSpec stageSpec, @Nullable Integer num, StageStatisticsCollector stageStatisticsCollector) {
        DynamicDriverContext dynamicDriverContext = new DynamicDriverContext(stageSpec, this.sec, stageStatisticsCollector);
        JavaPairDStream transformToPair = this.stream.transformToPair(new DynamicAggregatorGroupBy(dynamicDriverContext));
        return wrap((num == null ? transformToPair.groupByKey() : transformToPair.groupByKey(num.intValue())).transform(new DynamicAggregatorAggregate(dynamicDriverContext)));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <U> SparkCollection<U> compute(final StageSpec stageSpec, SparkCompute<T, U> sparkCompute) throws Exception {
        final DynamicSparkCompute dynamicSparkCompute = new DynamicSparkCompute(new DynamicDriverContext(stageSpec, this.sec, new NoopStageStatisticsCollector()), sparkCompute);
        Transactionals.execute(this.sec, new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.1
            public void run(DatasetContext datasetContext) throws Exception {
                dynamicSparkCompute.initialize(new BasicSparkExecutionPluginContext(DStreamCollection.this.sec, JavaSparkContext.fromSparkContext(DStreamCollection.this.stream.context().sparkContext()), datasetContext, new SparkPipelineRuntime(DStreamCollection.this.sec), stageSpec));
            }
        }, Exception.class);
        return wrap(this.stream.transform(new ComputeTransformFunction(this.sec, stageSpec, dynamicSparkCompute)));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public void store(StageSpec stageSpec, PairFlatMapFunction<T, Object, Object> pairFlatMapFunction) {
        Compat.foreachRDD(this.stream.cache(), new StreamingBatchSinkFunction(pairFlatMapFunction, this.sec, stageSpec));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public void store(StageSpec stageSpec, SparkSink<T> sparkSink) throws Exception {
        Compat.foreachRDD(this.stream.cache(), new StreamingSparkSinkFunction(this.sec, stageSpec));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public void publishAlerts(StageSpec stageSpec, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        Compat.foreachRDD(this.stream, new StreamingAlertPublishFunction(this.sec, stageSpec));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> window(StageSpec stageSpec, Windower windower) {
        String name = stageSpec.getName();
        return (SparkCollection<T>) wrap(this.stream.transform(new CountingTransformFunction(name, this.sec.getMetrics(), Constants.Metrics.RECORDS_IN, null)).window(Durations.seconds(windower.getWidth()), Durations.seconds(windower.getSlideInterval())).transform(new CountingTransformFunction(name, this.sec.getMetrics(), Constants.Metrics.RECORDS_OUT, this.sec.getDataTracer(name))));
    }

    private <U> SparkCollection<U> wrap(JavaDStream<U> javaDStream) {
        return new DStreamCollection(this.sec, javaDStream);
    }
}
