package org.apache.bahir.sql.streaming.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill$;
import akka.actor.Props$;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import java.sql.Timestamp;
import java.util.concurrent.CountDownLatch;
import org.apache.bahir.utils.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.concurrent.TrieMap;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: AkkaStreamSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-f\u0001B\u0001\u0003\u0001=\u0011\u0001#Q6lCN#(/Z1n'>,(oY3\u000b\u0005\r!\u0011\u0001B1lW\u0006T!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0013)\tQAY1iSJT!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011\u0003\u0002\u0001\u0011-\u0001\u0002\"!\u0005\u000b\u000e\u0003IQ\u0011aE\u0001\u0006g\u000e\fG.Y\u0005\u0003+I\u0011a!\u00118z%\u00164\u0007CA\f\u001f\u001b\u0005A\"BA\u0003\u001a\u0015\tQ2$A\u0005fq\u0016\u001cW\u000f^5p]*\u0011q\u0001\b\u0006\u0003;)\tQa\u001d9be.L!a\b\r\u0003\rM{WO]2f!\t\tC%D\u0001#\u0015\t\u0019\u0003\"A\u0003vi&d7/\u0003\u0002&E\t9Aj\\4hS:<\u0007\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\u001dU\u0014Hn\u00144Qk\nd\u0017n\u001d5feB\u0011\u0011\u0006\f\b\u0003#)J!a\u000b\n\u0002\rA\u0013X\rZ3g\u0013\ticF\u0001\u0004TiJLgn\u001a\u0006\u0003WIA\u0001\u0002\r\u0001\u0003\u0002\u0003\u0006I!M\u0001\fa\u0016\u00148/[:uK:\u001cW\r\u0005\u00023k5\t1G\u0003\u00025\u0019\u00059!o\\2lg\u0012\u0014\u0017B\u0001\u001c4\u0005\u001d\u0011vnY6t\t\nC\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I!O\u0001\u000bgFd7i\u001c8uKb$\bC\u0001\u001e<\u001b\u0005Y\u0012B\u0001\u001f\u001c\u0005)\u0019\u0016\u000bT\"p]R,\u0007\u0010\u001e\u0005\t}\u0001\u0011\t\u0011)A\u0005\u007f\u0005iQ.Z:tC\u001e,\u0007+\u0019:tKJ\u0004B!\u0005!)\u0005&\u0011\u0011I\u0005\u0002\n\rVt7\r^5p]F\u0002B!E\")\u000b&\u0011AI\u0005\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0005\u0019SU\"A$\u000b\u0005\u001dA%\"A%\u0002\t)\fg/Y\u0005\u0003\u0017\u001e\u0013\u0011\u0002V5nKN$\u0018-\u001c9\t\u000b5\u0003A\u0011\u0001(\u0002\rqJg.\u001b;?)\u0015y\u0015KU*U!\t\u0001\u0006!D\u0001\u0003\u0011\u00159C\n1\u0001)\u0011\u0015\u0001D\n1\u00012\u0011\u0015AD\n1\u0001:\u0011\u0015qD\n1\u0001@\u0011\u00151\u0006\u0001\"\u0011X\u0003\u0019\u00198\r[3nCV\t\u0001\f\u0005\u0002Z96\t!L\u0003\u0002\\7\u0005)A/\u001f9fg&\u0011QL\u0017\u0002\u000b'R\u0014Xo\u0019;UsB,\u0007bB0\u0001\u0005\u0004%I\u0001Y\u0001\u0006gR|'/Z\u000b\u0002CB\u0011\u0001KY\u0005\u0003G\n\u0011\u0011\u0003T8dC2lUm]:bO\u0016\u001cFo\u001c:f\u0011\u0019)\u0007\u0001)A\u0005C\u000611\u000f^8sK\u0002Bqa\u001a\u0001C\u0002\u0013%\u0001.\u0001\u0005nKN\u001c\u0018mZ3t+\u0005I\u0007\u0003\u00026pc\nk\u0011a\u001b\u0006\u0003Y6\f!bY8oGV\u0014(/\u001a8u\u0015\tq'#\u0001\u0006d_2dWm\u0019;j_:L!\u0001]6\u0003\u000fQ\u0013\u0018.Z'baB\u0011\u0011C]\u0005\u0003gJ\u00111!\u00138u\u0011\u0019)\b\u0001)A\u0005S\u0006IQ.Z:tC\u001e,7\u000f\t\u0005\bo\u0002\u0011\r\u0011\"\u0003y\u0003!Ig.\u001b;M_\u000e\\W#A=\u0011\u0005itX\"A>\u000b\u00051d(BA?I\u0003\u0011)H/\u001b7\n\u0005}\\(AD\"pk:$Hi\\<o\u0019\u0006$8\r\u001b\u0005\b\u0003\u0007\u0001\u0001\u0015!\u0003z\u0003%Ig.\u001b;M_\u000e\\\u0007\u0005C\u0005\u0002\b\u0001\u0001\r\u0011\"\u0003\u0002\n\u00051qN\u001a4tKR,\u0012!\u001d\u0005\n\u0003\u001b\u0001\u0001\u0019!C\u0005\u0003\u001f\t!b\u001c4gg\u0016$x\fJ3r)\u0011\t\t\"a\u0006\u0011\u0007E\t\u0019\"C\u0002\u0002\u0016I\u0011A!\u00168ji\"I\u0011\u0011DA\u0006\u0003\u0003\u0005\r!]\u0001\u0004q\u0012\n\u0004bBA\u000f\u0001\u0001\u0006K!]\u0001\b_\u001a47/\u001a;!\u0011-\t\t\u0003\u0001a\u0001\u0002\u0004%I!a\t\u0002\u0017\u0005\u001cGo\u001c:TsN$X-\\\u000b\u0003\u0003K\u0001B!a\n\u000205\u0011\u0011\u0011\u0006\u0006\u0005\u0003W\ti#A\u0003bGR|'OC\u0001\u0004\u0013\u0011\t\t$!\u000b\u0003\u0017\u0005\u001bGo\u001c:TsN$X-\u001c\u0005\f\u0003k\u0001\u0001\u0019!a\u0001\n\u0013\t9$A\bbGR|'oU=ti\u0016lw\fJ3r)\u0011\t\t\"!\u000f\t\u0015\u0005e\u00111GA\u0001\u0002\u0004\t)\u0003\u0003\u0005\u0002>\u0001\u0001\u000b\u0015BA\u0013\u00031\t7\r^8s'f\u001cH/Z7!\u0011-\t\t\u0005\u0001a\u0001\u0002\u0004%I!a\u0011\u0002\u001f\u0005\u001cGo\u001c:TkB,'O^5t_J,\"!!\u0012\u0011\t\u0005\u001d\u0012qI\u0005\u0005\u0003\u0013\nIC\u0001\u0005BGR|'OU3g\u0011-\ti\u0005\u0001a\u0001\u0002\u0004%I!a\u0014\u0002'\u0005\u001cGo\u001c:TkB,'O^5t_J|F%Z9\u0015\t\u0005E\u0011\u0011\u000b\u0005\u000b\u00033\tY%!AA\u0002\u0005\u0015\u0003\u0002CA+\u0001\u0001\u0006K!!\u0012\u0002!\u0005\u001cGo\u001c:TkB,'O^5t_J\u0004\u0003bBA-\u0001\u0011%\u00111L\u0001\u0019M\u0016$8\r\u001b'bgR\u0004&o\\2fgN,Gm\u00144gg\u0016$H#A9\t\u000f\u0005}\u0003\u0001\"\u0003\u0002b\u0005Q\u0011N\\5uS\u0006d\u0017N_3\u0015\u0005\u0005E\u0001bBA3\u0001\u0011\u0005\u0013\u0011M\u0001\u0005gR|\u0007\u000fC\u0004\u0002j\u0001!\t%a\u001b\u0002\u0013\u001d,Go\u00144gg\u0016$XCAA7!\u0015\t\u0012qNA:\u0013\r\t\tH\u0005\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007]\t)(C\u0002\u0002xa\u0011aa\u00144gg\u0016$\bbBA>\u0001\u0011\u0005\u0013QP\u0001\tO\u0016$()\u0019;dQR1\u0011qPAR\u0003O\u0003B!!!\u0002\u001e:!\u00111QAM\u001d\u0011\t))a&\u000f\t\u0005\u001d\u0015Q\u0013\b\u0005\u0003\u0013\u000b\u0019J\u0004\u0003\u0002\f\u0006EUBAAG\u0015\r\tyID\u0001\u0007yI|w\u000e\u001e \n\u00035I!a\u0003\u0007\n\u0005uQ\u0011BA\u0004\u001d\u0013\r\tYjG\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\ty*!)\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAAN7!A\u0011QUA=\u0001\u0004\ti'A\u0003ti\u0006\u0014H\u000f\u0003\u0005\u0002*\u0006e\u0004\u0019AA:\u0003\r)g\u000e\u001a")
/* loaded from: input_file:org/apache/bahir/sql/streaming/akka/AkkaStreamSource.class */
public class AkkaStreamSource implements Source, Logging {
    public final String org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$urlOfPublisher;
    private final SQLContext sqlContext;
    public final Function1<String, Tuple2<String, Timestamp>> org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messageParser;
    private final LocalMessageStore org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$store;
    private final TrieMap<Object, Tuple2<String, Timestamp>> org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messages;
    private final CountDownLatch org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$initLock;
    private int org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset;
    private ActorSystem actorSystem;
    private ActorRef actorSupervisor;
    private final Logger log;

    @Override // org.apache.bahir.utils.Logging
    public final Logger log() {
        return this.log;
    }

    @Override // org.apache.bahir.utils.Logging
    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    public void commit(Offset offset) {
        Source.class.commit(this, offset);
    }

    public StructType schema() {
        return AkkaStreamConstants$.MODULE$.SCHEMA_DEFAULT();
    }

    public LocalMessageStore org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$store() {
        return this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$store;
    }

    public TrieMap<Object, Tuple2<String, Timestamp>> org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messages() {
        return this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messages;
    }

    public CountDownLatch org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$initLock() {
        return this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$initLock;
    }

    public int org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset() {
        return this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset;
    }

    public void org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset_$eq(int i) {
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset = i;
    }

    private ActorSystem actorSystem() {
        return this.actorSystem;
    }

    private void actorSystem_$eq(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    private ActorRef actorSupervisor() {
        return this.actorSupervisor;
    }

    private void actorSupervisor_$eq(ActorRef actorRef) {
        this.actorSupervisor = actorRef;
    }

    private int fetchLastProcessedOffset() {
        int i;
        Success apply = Try$.MODULE$.apply(new AkkaStreamSource$$anonfun$1(this));
        if (apply instanceof Success) {
            int unboxToInt = BoxesRunTime.unboxToInt(apply.value());
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Recovering from last stored offset ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)})));
            i = unboxToInt;
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            i = 0;
        }
        return i;
    }

    private void initialize() {
        actorSystem_$eq((ActorSystem) AkkaStreamConstants$.MODULE$.defaultActorSystemCreator().apply());
        actorSupervisor_$eq(actorSystem().actorOf(Props$.MODULE$.apply(new AkkaStreamSource$$anonfun$initialize$1(this), ClassTag$.MODULE$.apply(AkkaStreamSource$Supervisor$1.class)), "Supervisor"));
        org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset_$eq(fetchLastProcessedOffset());
        org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$initLock().countDown();
    }

    public void stop() {
        ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(actorSupervisor());
        PoisonPill$ poisonPill$ = PoisonPill$.MODULE$;
        actorRef2Scala.$bang(poisonPill$, actorRef2Scala.$bang$default$2(poisonPill$));
        Persistence$.MODULE$.close();
        actorSystem().shutdown();
        actorSystem().awaitTermination();
    }

    public Option<Offset> getOffset() {
        return org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset() == 0 ? None$.MODULE$ : new Some(new LongOffset(org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset()));
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        int offset2 = (int) ((LongOffset) option.getOrElse(new AkkaStreamSource$$anonfun$5(this))).offset();
        int offset3 = (int) ((LongOffset) offset).offset();
        ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(offset2 + 1), offset3).foreach(new AkkaStreamSource$$anonfun$getBatch$1(this, empty));
        log().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Get Batch invoked, ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{empty.mkString()})));
        return this.sqlContext.implicits().localSeqToDatasetHolder(empty, this.sqlContext.implicits().newProductEncoder(scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(AkkaStreamSource.class.getClassLoader()), new TypeCreator(this) { // from class: org.apache.bahir.sql.streaming.akka.AkkaStreamSource$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), mirror.staticClass("java.sql.Timestamp").asType().toTypeConstructor()})));
            }
        }))).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"value", "timestamp"}));
    }

    public AkkaStreamSource(String str, RocksDB rocksDB, SQLContext sQLContext, Function1<String, Tuple2<String, Timestamp>> function1) {
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$urlOfPublisher = str;
        this.sqlContext = sQLContext;
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messageParser = function1;
        Source.class.$init$(this);
        org$apache$bahir$utils$Logging$_setter_$log_$eq(LoggerFactory.getLogger(new StringOps(Predef$.MODULE$.augmentString(getClass().getName())).stripSuffix("$")));
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$store = new LocalMessageStore(rocksDB, sQLContext.sparkContext().getConf());
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$messages = new TrieMap<>();
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$initLock = new CountDownLatch(1);
        this.org$apache$bahir$sql$streaming$akka$AkkaStreamSource$$offset = 0;
        initialize();
    }
}
