package com.datastax.spark.connector.demo.streaming;

import com.datastax.spark.connector.CassandraRow;
import com.datastax.spark.connector.RowsInBatch;
import com.datastax.spark.connector.SomeColumns;
import com.datastax.spark.connector.SparkContextFunctions;
import com.datastax.spark.connector.cql.CassandraConnector$;
import com.datastax.spark.connector.embedded.Assertions;
import com.datastax.spark.connector.embedded.EmbeddedKafka;
import com.datastax.spark.connector.mapper.ColumnMapper$;
import com.datastax.spark.connector.rdd.CassandraRDD;
import com.datastax.spark.connector.rdd.ValidRDDType$;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory$GenericRowReader$$;
import com.datastax.spark.connector.streaming.DStreamFunctions;
import com.datastax.spark.connector.streaming.package$;
import com.datastax.spark.connector.util.Logging;
import com.datastax.spark.connector.writer.RowWriterFactory$;
import com.datastax.spark.connector.writer.WriteConf;
import com.datastax.spark.connector.writer.WriteConf$;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaStreamingDemo.scala */
/* loaded from: input_file:com/datastax/spark/connector/demo/streaming/KafkaStreamingDemo$.class */
public final class KafkaStreamingDemo$ implements Assertions, Logging {
    public static final KafkaStreamingDemo$ MODULE$ = null;
    private final SparkCassandraSettings settings;
    private final SparkConf sc;
    private final String topic;
    private EmbeddedKafka kafka;
    private transient Logger com$datastax$spark$connector$util$Logging$$log_;
    private Duration com$datastax$spark$connector$embedded$Assertions$$end;
    private volatile boolean bitmap$0;

    static {
        new KafkaStreamingDemo$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private EmbeddedKafka kafka$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.kafka = new EmbeddedKafka();
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.kafka;
        }
    }

    public Logger com$datastax$spark$connector$util$Logging$$log_() {
        return this.com$datastax$spark$connector$util$Logging$$log_;
    }

    public void com$datastax$spark$connector$util$Logging$$log__$eq(Logger logger) {
        this.com$datastax$spark$connector$util$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public ClassLoader getSparkClassLoader() {
        return Logging.class.getSparkClassLoader(this);
    }

    public Duration com$datastax$spark$connector$embedded$Assertions$$end() {
        return this.com$datastax$spark$connector$embedded$Assertions$$end;
    }

    public void com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration duration) {
        this.com$datastax$spark$connector$embedded$Assertions$$end = duration;
    }

    public FiniteDuration now() {
        return Assertions.class.now(this);
    }

    public FiniteDuration remainingOrDefault() {
        return Assertions.class.remainingOrDefault(this);
    }

    public FiniteDuration remainingOr(FiniteDuration finiteDuration) {
        return Assertions.class.remainingOr(this, finiteDuration);
    }

    public void awaitCond(Function0<Object> function0, Duration duration, Duration duration2, String str) {
        Assertions.class.awaitCond(this, function0, duration, duration2, str);
    }

    public Duration awaitCond$default$2() {
        return Assertions.class.awaitCond$default$2(this);
    }

    public Duration awaitCond$default$3() {
        return Assertions.class.awaitCond$default$3(this);
    }

    public String awaitCond$default$4() {
        return Assertions.class.awaitCond$default$4(this);
    }

    public SparkCassandraSettings settings() {
        return this.settings;
    }

    public SparkConf sc() {
        return this.sc;
    }

    private String topic() {
        return this.topic;
    }

    public EmbeddedKafka kafka() {
        return this.bitmap$0 ? this.kafka : kafka$lzycompute();
    }

    public void main(String[] strArr) {
        StreamingContext streamingContext = new StreamingContext(sc(), Seconds$.MODULE$.apply(2L));
        SparkEnv$.MODULE$.get().actorSystem().registerOnTermination(new KafkaStreamingDemo$$anonfun$main$1());
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("a"), BoxesRunTime.boxToInteger(5)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("b"), BoxesRunTime.boxToInteger(3)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("c"), BoxesRunTime.boxToInteger(10))}));
        kafka().createTopic(topic());
        kafka().produceAndSendMessage(topic(), apply);
        DStreamFunctions dStreamFunctions = package$.MODULE$.toDStreamFunctions(StreamingContext$.MODULE$.toPairDStreamFunctions(KafkaUtils$.MODULE$.createStream(streamingContext, kafka().kafkaParams(), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), StorageLevel$.MODULE$.MEMORY_ONLY(), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class)).map(new KafkaStreamingDemo$$anonfun$3(), ClassTag$.MODULE$.apply(String.class)).map(new KafkaStreamingDemo$$anonfun$4(), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), Ordering$String$.MODULE$).reduceByKey(new KafkaStreamingDemo$$anonfun$1()), ClassTag$.MODULE$.apply(Tuple2.class));
        SomeColumns someColumns = new SomeColumns(Predef$.MODULE$.wrapRefArray(new String[]{"key", "value"}));
        WriteConf writeConf = new WriteConf(new RowsInBatch(1), WriteConf$.MODULE$.apply$default$2(), WriteConf$.MODULE$.apply$default$3());
        dStreamFunctions.saveToCassandra("streaming_test", "key_value", someColumns, writeConf, dStreamFunctions.saveToCassandra$default$5("streaming_test", "key_value", someColumns, writeConf), RowWriterFactory$.MODULE$.defaultRowWriterFactory(ColumnMapper$.MODULE$.tuple2ColumnMapper()));
        streamingContext.start();
        SparkContextFunctions streamingContextFunctions = package$.MODULE$.toStreamingContextFunctions(streamingContext);
        CassandraRDD select = streamingContextFunctions.cassandraTable("streaming_test", "key_value", streamingContextFunctions.cassandraTable$default$3("streaming_test", "key_value"), ClassTag$.MODULE$.apply(CassandraRow.class), RowReaderFactory$GenericRowReader$$.MODULE$, ValidRDDType$.MODULE$.javaSerializableAsValidRDDType()).select(Predef$.MODULE$.wrapRefArray(new String[]{"key", "value"}));
        awaitCond(new KafkaStreamingDemo$$anonfun$main$2(apply, select), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds(), awaitCond$default$3(), awaitCond$default$4());
        apply.forall(new KafkaStreamingDemo$$anonfun$main$3((CassandraRow[]) select.collect()));
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Assertions successful, shutting down."})).s(Nil$.MODULE$));
        streamingContext.stop(true, false);
        streamingContext.awaitTermination();
    }

    private KafkaStreamingDemo$() {
        MODULE$ = this;
        Assertions.class.$init$(this);
        Logging.class.$init$(this);
        this.settings = new SparkCassandraSettings();
        this.sc = new SparkConf(true).set("spark.cassandra.connection.host", settings().CassandraSeed()).set("spark.cleaner.ttl", BoxesRunTime.boxToInteger(settings().SparkCleanerTtl()).toString()).setMaster(settings().SparkMaster()).setAppName("Streaming Kafka App");
        CassandraConnector$.MODULE$.apply(sc()).withSessionDo(new KafkaStreamingDemo$$anonfun$2());
        this.topic = "topic1";
    }
}
