/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.examples.sql.cloudant;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.bahir.cloudant.CloudantReceiver;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.receiver.Receiver;
import scala.Function2;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.LongRef;

public final class CloudantStreamingSelector$ {
    public static final CloudantStreamingSelector$ MODULE$;

    static {
        new CloudantStreamingSelector$();
    }

    public void main(String[] args) {
        SparkSession spark = SparkSession$.MODULE$.builder().appName("Cloudant Spark SQL External Datasource in Scala").master("local[*]").getOrCreate();
        StreamingContext ssc = new StreamingContext(spark.sparkContext(), Seconds$.MODULE$.apply(10L));
        AtomicLong curTotalAmount = new AtomicLong(0L);
        AtomicLong curSalesCount = new AtomicLong(0L);
        LongRef batchAmount = LongRef.create((long)0L);
        ReceiverInputDStream changes = ssc.receiverStream((Receiver)new CloudantReceiver(spark.sparkContext().getConf(), (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cloudant.host"), (Object)"examples.cloudant.com"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"database"), (Object)"sales"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"selector"), (Object)"{\"month\":\"May\", \"rep\":\"John\"}")}))), ClassTag$.MODULE$.apply(String.class));
        changes.foreachRDD((Function2)new Serializable(spark, ssc, curTotalAmount, curSalesCount, batchAmount){
            public static final long serialVersionUID = 0L;
            private final SparkSession spark$1;
            private final StreamingContext ssc$1;
            private final AtomicLong curTotalAmount$1;
            private final AtomicLong curSalesCount$1;
            private final LongRef batchAmount$1;

            public final void apply(RDD<String> rdd, Time time) {
                Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"========= ", " ========="})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{time})));
                Dataset changesDataFrame = this.spark$1.read().json(this.spark$1.implicits().rddToDatasetHolder(rdd, this.spark$1.implicits().newStringEncoder()).toDS());
                if (changesDataFrame.schema().nonEmpty()) {
                    changesDataFrame.select("*", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).show();
                    this.batchAmount$1.elem = ((Row[])changesDataFrame.groupBy((Seq)Nil$.MODULE$).sum((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"amount"})).collect())[0].getLong(0);
                    this.curSalesCount$1.getAndAdd(changesDataFrame.count());
                    this.curTotalAmount$1.getAndAdd(this.batchAmount$1.elem);
                    Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Current sales count:").append((Object)this.curSalesCount$1).toString());
                    Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Current total amount:").append((Object)this.curTotalAmount$1).toString());
                } else {
                    this.ssc$1.stop(this.ssc$1.stop$default$1());
                }
            }
            {
                this.spark$1 = spark$1;
                this.ssc$1 = ssc$1;
                this.curTotalAmount$1 = curTotalAmount$1;
                this.curSalesCount$1 = curSalesCount$1;
                this.batchAmount$1 = batchAmount$1;
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }

    private CloudantStreamingSelector$() {
        MODULE$ = this;
    }
}

