package co.cask.cdap.spark.app;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkExecutionContext;
import co.cask.cdap.api.spark.SparkMain;
import co.cask.cdap.api.spark.SparkMain$Transaction$;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream$;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaSparkStreaming.scala */
@ScalaSignature(bytes = "\u0006\u000192A!\u0001\u0002\u0001\u001b\t\u00192*\u00194lCN\u0003\u0018M]6TiJ,\u0017-\\5oO*\u00111\u0001B\u0001\u0004CB\u0004(BA\u0003\u0007\u0003\u0015\u0019\b/\u0019:l\u0015\t9\u0001\"\u0001\u0003dI\u0006\u0004(BA\u0005\u000b\u0003\u0011\u0019\u0017m]6\u000b\u0003-\t!aY8\u0004\u0001M\u0019\u0001AD\u000b\u0011\u0005=\u0019R\"\u0001\t\u000b\u0005\u0015\t\"B\u0001\n\u0007\u0003\r\t\u0007/[\u0005\u0003)A\u0011Q\"\u00112tiJ\f7\r^*qCJ\\\u0007CA\b\u0017\u0013\t9\u0002CA\u0005Ta\u0006\u00148.T1j]\")\u0011\u0004\u0001C\u00015\u00051A(\u001b8jiz\"\u0012a\u0007\t\u00039\u0001i\u0011A\u0001\u0005\u0006=\u0001!\tfH\u0001\nG>tg-[4ve\u0016$\u0012\u0001\t\t\u0003C\u0011j\u0011A\t\u0006\u0002G\u0005)1oY1mC&\u0011QE\t\u0002\u0005+:LG\u000fC\u0003(\u0001\u0011\u0005\u0003&A\u0002sk:$\"\u0001I\u0015\t\u000b)2\u00039A\u0016\u0002\u0007M,7\r\u0005\u0002\u0010Y%\u0011Q\u0006\u0005\u0002\u0016'B\f'o[#yK\u000e,H/[8o\u0007>tG/\u001a=u\u0001")
/* loaded from: input_file:co/cask/cdap/spark/app/KafkaSparkStreaming.class */
public class KafkaSparkStreaming extends AbstractSpark implements SparkMain {
    private final Function1<StreamEvent, Tuple2<Object, String>> timestampStringStreamDecoder;
    private final Function1<StreamEvent, String> stringStreamDecoder;
    private volatile SparkMain$Transaction$ Transaction$module;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private SparkMain$Transaction$ Transaction$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.Transaction$module == null) {
                this.Transaction$module = new SparkMain$Transaction$(this);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.Transaction$module;
        }
    }

    public SparkMain$Transaction$ Transaction() {
        return this.Transaction$module == null ? Transaction$lzycompute() : this.Transaction$module;
    }

    public Function1<StreamEvent, Tuple2<Object, String>> timestampStringStreamDecoder() {
        return this.timestampStringStreamDecoder;
    }

    public Function1<StreamEvent, String> stringStreamDecoder() {
        return this.stringStreamDecoder;
    }

    public void co$cask$cdap$api$spark$SparkMain$_setter_$timestampStringStreamDecoder_$eq(Function1 function1) {
        this.timestampStringStreamDecoder = function1;
    }

    public void co$cask$cdap$api$spark$SparkMain$_setter_$stringStreamDecoder_$eq(Function1 function1) {
        this.stringStreamDecoder = function1;
    }

    public <K, V> SparkMain.SparkProgramRDDFunctions<K, V> SparkProgramRDDFunctions(RDD<Tuple2<K, V>> rdd, ClassTag<K> classTag, ClassTag<V> classTag2) {
        return SparkMain.class.SparkProgramRDDFunctions(this, rdd, classTag, classTag2);
    }

    public SparkMain.SparkProgramContextFunctions SparkProgramContextFunctions(SparkContext sparkContext) {
        return SparkMain.class.SparkProgramContextFunctions(this, sparkContext);
    }

    public void configure() {
        setMainClass(KafkaSparkStreaming.class);
    }

    public void run(SparkExecutionContext sparkExecutionContext) {
        StreamingContext streamingContext = new StreamingContext(new SparkContext(), Seconds$.MODULE$.apply(1L));
        String str = (String) sparkExecutionContext.getRuntimeArguments().get("result.dataset");
        DStream$.MODULE$.toPairDStreamFunctions(KafkaUtils$.MODULE$.createDirectStream(streamingContext, Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("metadata.broker.list", sparkExecutionContext.getRuntimeArguments().get("kafka.brokers")), new Tuple2("auto.offset.reset", "smallest")})), Predef$.MODULE$.refArrayOps(((String) sparkExecutionContext.getRuntimeArguments().get("kafka.topics")).split(",")).toSet(), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class)).map(new KafkaSparkStreaming$$anonfun$run$2(this), ClassTag$.MODULE$.apply(String.class)).flatMap(new KafkaSparkStreaming$$anonfun$run$3(this), ClassTag$.MODULE$.apply(String.class)).map(new KafkaSparkStreaming$$anonfun$run$4(this), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Long(), Ordering$String$.MODULE$).reduceByKey(new KafkaSparkStreaming$$anonfun$run$1(this)).foreachRDD(new KafkaSparkStreaming$$anonfun$run$5(this, sparkExecutionContext, str));
        streamingContext.start();
        try {
            streamingContext.awaitTermination();
        } catch (InterruptedException unused) {
            streamingContext.stop(true, true);
        }
    }

    public KafkaSparkStreaming() {
        SparkMain.class.$init$(this);
    }
}
