/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.spark.connector.demo.streaming;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.CassandraRow;
import com.datastax.spark.connector.SomeColumns;
import com.datastax.spark.connector.SparkContextFunctions;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.cql.CassandraConnector$;
import com.datastax.spark.connector.demo.streaming.SparkCassandraSettings;
import com.datastax.spark.connector.embedded.Assertions;
import com.datastax.spark.connector.embedded.EmbeddedKafka;
import com.datastax.spark.connector.mapper.ColumnMapper;
import com.datastax.spark.connector.mapper.ColumnMapper$;
import com.datastax.spark.connector.rdd.CassandraRDD;
import com.datastax.spark.connector.rdd.ValidRDDType;
import com.datastax.spark.connector.rdd.ValidRDDType$;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory;
import com.datastax.spark.connector.streaming.package$;
import com.datastax.spark.connector.util.Logging;
import com.datastax.spark.connector.writer.RowWriterFactory$;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.math.Ordering;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public final class KafkaStreamingDemo$
implements Assertions,
Logging {
    public static final KafkaStreamingDemo$ MODULE$;
    private final SparkCassandraSettings settings;
    private final SparkConf sc;
    private final String topic;
    private EmbeddedKafka kafka;
    private transient Logger com$datastax$spark$connector$util$Logging$$log_;
    private Duration com$datastax$spark$connector$embedded$Assertions$$end;
    private volatile boolean bitmap$0;

    static {
        new KafkaStreamingDemo$();
    }

    private EmbeddedKafka kafka$lzycompute() {
        KafkaStreamingDemo$ kafkaStreamingDemo$ = this;
        synchronized (kafkaStreamingDemo$) {
            if (!this.bitmap$0) {
                this.kafka = new EmbeddedKafka();
                this.bitmap$0 = true;
            }
            return this.kafka;
        }
    }

    public Logger com$datastax$spark$connector$util$Logging$$log_() {
        return this.com$datastax$spark$connector$util$Logging$$log_;
    }

    public void com$datastax$spark$connector$util$Logging$$log__$eq(Logger x$1) {
        this.com$datastax$spark$connector$util$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public ClassLoader getSparkClassLoader() {
        return Logging.class.getSparkClassLoader((Logging)this);
    }

    public Duration com$datastax$spark$connector$embedded$Assertions$$end() {
        return this.com$datastax$spark$connector$embedded$Assertions$$end;
    }

    public void com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration x$1) {
        this.com$datastax$spark$connector$embedded$Assertions$$end = x$1;
    }

    public FiniteDuration now() {
        return Assertions.class.now((Assertions)this);
    }

    public FiniteDuration remainingOrDefault() {
        return Assertions.class.remainingOrDefault((Assertions)this);
    }

    public FiniteDuration remainingOr(FiniteDuration duration) {
        return Assertions.class.remainingOr((Assertions)this, (FiniteDuration)duration);
    }

    public void awaitCond(Function0<Object> p, Duration max, Duration interval, String message) {
        Assertions.class.awaitCond((Assertions)this, p, (Duration)max, (Duration)interval, (String)message);
    }

    public Duration awaitCond$default$2() {
        return Assertions.class.awaitCond$default$2((Assertions)this);
    }

    public Duration awaitCond$default$3() {
        return Assertions.class.awaitCond$default$3((Assertions)this);
    }

    public String awaitCond$default$4() {
        return Assertions.class.awaitCond$default$4((Assertions)this);
    }

    public SparkCassandraSettings settings() {
        return this.settings;
    }

    public SparkConf sc() {
        return this.sc;
    }

    private String topic() {
        return this.topic;
    }

    public EmbeddedKafka kafka() {
        return this.bitmap$0 ? this.kafka : this.kafka$lzycompute();
    }

    public void main(String[] args) {
        StreamingContext ssc = new StreamingContext(this.sc(), Seconds$.MODULE$.apply(2L));
        SparkEnv$.MODULE$.get().actorSystem().registerOnTermination((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                KafkaStreamingDemo$.MODULE$.kafka().shutdown();
            }
        });
        Map sent = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)5)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)3)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"c"), (Object)BoxesRunTime.boxToInteger((int)10))}));
        this.kafka().createTopic(this.topic());
        this.kafka().produceAndSendMessage(this.topic(), sent);
        ReceiverInputDStream stream = KafkaUtils$.MODULE$.createStream(ssc, this.kafka().kafkaParams(), (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), StorageLevel$.MODULE$.MEMORY_ONLY(), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
        package$.MODULE$.toDStreamFunctions(StreamingContext$.MODULE$.toPairDStreamFunctions(stream.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, String> x0$1) {
                Tuple2<String, String> tuple2 = x0$1;
                if (tuple2 != null) {
                    String v;
                    String string = v = (String)tuple2._2();
                    return string;
                }
                throw new MatchError(tuple2);
            }
        }, ClassTag$.MODULE$.apply(String.class)).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, Object> apply(String x) {
                return new Tuple2((Object)x, (Object)BoxesRunTime.boxToInteger((int)1));
            }
        }, ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), (Ordering)Ordering.String$.MODULE$).reduceByKey((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final int apply(int x$1, int x$2) {
                return this.apply$mcIII$sp(x$1, x$2);
            }

            public int apply$mcIII$sp(int x$1, int x$2) {
                return x$1 + x$2;
            }
        }), ClassTag$.MODULE$.apply(Tuple2.class)).saveToCassandra("streaming_test", "key_value", new SomeColumns((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"key", "value"})), 1, RowWriterFactory$.MODULE$.defaultRowWriterFactory(ClassTag$.MODULE$.apply(Tuple2.class), (ColumnMapper)ColumnMapper$.MODULE$.tuple2ColumnMapper()));
        ssc.start();
        SparkContextFunctions qual$1 = package$.MODULE$.toStreamingContextFunctions(ssc);
        String x$4 = "streaming_test";
        String x$5 = "key_value";
        ClassTag x$6 = ClassTag$.MODULE$.apply(CassandraRow.class);
        RowReaderFactory.GenericRowReader$$ x$7 = RowReaderFactory.GenericRowReader$$.MODULE$;
        ValidRDDType x$8 = ValidRDDType$.MODULE$.javaSerializableAsValidRDDType();
        CassandraConnector x$9 = qual$1.cassandraTable$default$3(x$4, x$5);
        CassandraRDD rdd = qual$1.cassandraTable(x$4, x$5, x$9, x$6, (RowReaderFactory)x$7, x$8).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"key", "value"}));
        this.awaitCond((Function0<Object>)new Serializable(sent, rdd){
            public static final long serialVersionUID = 0L;
            private final Map sent$1;
            private final CassandraRDD rdd$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return Predef$.MODULE$.refArrayOps((Object[])this.rdd$1.collect()).size() == this.sent$1.size();
            }
            {
                this.sent$1 = sent$1;
                this.rdd$1 = rdd$1;
            }
        }, (Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds(), this.awaitCond$default$3(), this.awaitCond$default$4());
        CassandraRow[] rows = (CassandraRow[])rdd.collect();
        sent.forall((Function1)new Serializable(rows){
            public static final long serialVersionUID = 0L;
            private final CassandraRow[] rows$1;

            public final boolean apply(Tuple2<String, Object> x$3) {
                return Predef$.MODULE$.refArrayOps((Object[])this.rows$1).contains(x$3);
            }
            {
                this.rows$1 = rows$1;
            }
        });
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assertions successful, shutting down."})).s((Seq)Nil$.MODULE$));
        ssc.stop(true, false);
        ssc.awaitTermination();
    }

    private KafkaStreamingDemo$() {
        MODULE$ = this;
        Assertions.class.$init$((Assertions)this);
        Logging.class.$init$((Logging)this);
        this.settings = new SparkCassandraSettings();
        this.sc = new SparkConf(true).set("spark.cassandra.connection.host", this.settings().CassandraSeed()).set("spark.cleaner.ttl", ((Object)BoxesRunTime.boxToInteger((int)this.settings().SparkCleanerTtl())).toString()).setMaster(this.settings().SparkMaster()).setAppName("Streaming Kafka App");
        CassandraConnector$.MODULE$.apply(this.sc()).withSessionDo((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final ResultSet apply(Session session) {
                session.execute(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"})).s((Seq)Nil$.MODULE$));
                session.execute(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)"})).s((Seq)Nil$.MODULE$));
                return session.execute(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"TRUNCATE streaming_test.key_value"})).s((Seq)Nil$.MODULE$));
            }
        });
        this.topic = "topic1";
    }
}

