/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bahir.sql.streaming.mqtt;

import java.util.List;
import java.util.Optional;
import javax.annotation.concurrent.GuardedBy;
import org.apache.bahir.sql.streaming.mqtt.LocalMessageStore;
import org.apache.bahir.sql.streaming.mqtt.LongOffset;
import org.apache.bahir.sql.streaming.mqtt.LongOffset$;
import org.apache.bahir.sql.streaming.mqtt.MQTTMessage;
import org.apache.bahir.sql.streaming.mqtt.MQTTStreamConstants$;
import org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource$$anonfun$createDataReaderFactories$2$;
import org.apache.bahir.utils.Logging;
import org.apache.bahir.utils.Logging$class;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.concurrent.TrieMap;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;
import scala.sys.package$;

@ScalaSignature(bytes="\u0006\u0001\t=a\u0001B\u0001\u0003\u0001=\u0011\u0001#T)U)N#(/Z1n'>,(oY3\u000b\u0005\r!\u0011\u0001B7riRT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0013)\tQAY1iSJT!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011\u0003\u0002\u0001\u00111\u0019\u0002\"!\u0005\f\u000e\u0003IQ!a\u0005\u000b\u0002\t1\fgn\u001a\u0006\u0002+\u0005!!.\u0019<b\u0013\t9\"C\u0001\u0004PE*,7\r\u001e\t\u00033\u0011j\u0011A\u0007\u0006\u0003\u000bmQ!\u0001H\u000f\u0002\rI,\u0017\rZ3s\u0015\tqr$\u0001\u0002we)\u0011\u0001%I\u0001\bg>,(oY3t\u0015\t9!E\u0003\u0002$\u0015\u0005)1\u000f]1sW&\u0011QE\u0007\u0002\u0011\u001b&\u001c'o\u001c\"bi\u000eD'+Z1eKJ\u0004\"a\n\u0016\u000e\u0003!R!!\u000b\u0005\u0002\u000bU$\u0018\u000e\\:\n\u0005-B#a\u0002'pO\u001eLgn\u001a\u0005\t[\u0001\u0011\t\u0011)A\u0005]\u00059q\u000e\u001d;j_:\u001c\bCA\u00181\u001b\u0005i\u0012BA\u0019\u001e\u0005E!\u0015\r^1T_V\u00148-Z(qi&|gn\u001d\u0005\tg\u0001\u0011\t\u0011)A\u0005i\u0005I!M]8lKJ,&\u000f\u001c\t\u0003kmr!AN\u001d\u000e\u0003]R\u0011\u0001O\u0001\u0006g\u000e\fG.Y\u0005\u0003u]\na\u0001\u0015:fI\u00164\u0017B\u0001\u001f>\u0005\u0019\u0019FO]5oO*\u0011!h\u000e\u0005\t\u007f\u0001\u0011\t\u0011)A\u0005\u0001\u0006Y\u0001/\u001a:tSN$XM\\2f!\t\t%*D\u0001C\u0015\t\u0019E)\u0001\u0004ncR$ho\r\u0006\u0003\u000b\u001a\u000baa\u00197jK:$(BA$I\u0003\u0011\u0001\u0018\r[8\u000b\u0005%c\u0011aB3dY&\u00048/Z\u0005\u0003\u0017\n\u0013Q#T9ui\u000ec\u0017.\u001a8u!\u0016\u00148/[:uK:\u001cW\r\u0003\u0005N\u0001\t\u0005\t\u0015!\u00035\u0003\u0015!x\u000e]5d\u0011!y\u0005A!A!\u0002\u0013!\u0014\u0001C2mS\u0016tG/\u00133\t\u0011E\u0003!\u0011!Q\u0001\nI\u000b!#\\9ui\u000e{gN\\3di>\u0003H/[8ogB\u0011\u0011iU\u0005\u0003)\n\u0013!#T9ui\u000e{gN\\3di>\u0003H/[8og\"Aa\u000b\u0001B\u0001B\u0003%q+A\u0002r_N\u0004\"A\u000e-\n\u0005e;$aA%oi\")1\f\u0001C\u00019\u00061A(\u001b8jiz\"\u0002\"X0aC\n\u001cG-\u001a\t\u0003=\u0002i\u0011A\u0001\u0005\u0006[i\u0003\rA\f\u0005\u0006gi\u0003\r\u0001\u000e\u0005\u0006\u007fi\u0003\r\u0001\u0011\u0005\u0006\u001bj\u0003\r\u0001\u000e\u0005\u0006\u001fj\u0003\r\u0001\u000e\u0005\u0006#j\u0003\rA\u0015\u0005\u0006-j\u0003\ra\u0016\u0005\nO\u0002\u0001\r\u00111A\u0005\n!\f1b\u001d;beR|eMZ:fiV\t\u0011\u000e\u0005\u0002\u001aU&\u00111N\u0007\u0002\u0007\u001f\u001a47/\u001a;\t\u00135\u0004\u0001\u0019!a\u0001\n\u0013q\u0017aD:uCJ$xJ\u001a4tKR|F%Z9\u0015\u0005=\u0014\bC\u0001\u001cq\u0013\t\txG\u0001\u0003V]&$\bbB:m\u0003\u0003\u0005\r![\u0001\u0004q\u0012\n\u0004BB;\u0001A\u0003&\u0011.\u0001\u0007ti\u0006\u0014Ho\u00144gg\u0016$\b\u0005C\u0005x\u0001\u0001\u0007\t\u0019!C\u0005Q\u0006IQM\u001c3PM\u001a\u001cX\r\u001e\u0005\ns\u0002\u0001\r\u00111A\u0005\ni\fQ\"\u001a8e\u001f\u001a47/\u001a;`I\u0015\fHCA8|\u0011\u001d\u0019\b0!AA\u0002%Da! \u0001!B\u0013I\u0017AC3oI>3gm]3uA!Aq\u0010\u0001b\u0001\n\u0003\t\t!A\u0004cC\u000e\\Gj\\4\u0016\u0003]Cq!!\u0002\u0001A\u0003%q+\u0001\u0005cC\u000e\\Gj\\4!\u0011%\tI\u0001\u0001b\u0001\n\u0013\tY!A\u0003ti>\u0014X-\u0006\u0002\u0002\u000eA\u0019a,a\u0004\n\u0007\u0005E!AA\tM_\u000e\fG.T3tg\u0006<Wm\u0015;pe\u0016D\u0001\"!\u0006\u0001A\u0003%\u0011QB\u0001\u0007gR|'/\u001a\u0011\t\u0013\u0005e\u0001A1A\u0005\n\u0005m\u0011\u0001C7fgN\fw-Z:\u0016\u0005\u0005u\u0001\u0003CA\u0010\u0003S\ti#a\r\u000e\u0005\u0005\u0005\"\u0002BA\u0012\u0003K\t!bY8oGV\u0014(/\u001a8u\u0015\r\t9cN\u0001\u000bG>dG.Z2uS>t\u0017\u0002BA\u0016\u0003C\u0011q\u0001\u0016:jK6\u000b\u0007\u000fE\u00027\u0003_I1!!\r8\u0005\u0011auN\\4\u0011\u0007y\u000b)$C\u0002\u00028\t\u00111\"T)U)6+7o]1hK\"A\u00111\b\u0001!\u0002\u0013\ti\"A\u0005nKN\u001c\u0018mZ3tA!I\u0011q\b\u0001A\u0002\u0013%\u0011\u0011I\u0001\u000eGV\u0014(/\u001a8u\u001f\u001a47/\u001a;\u0016\u0005\u0005\r\u0003c\u00010\u0002F%\u0019\u0011q\t\u0002\u0003\u00151{gnZ(gMN,G\u000fC\u0005\u0002L\u0001\u0001\r\u0011\"\u0003\u0002N\u0005\t2-\u001e:sK:$xJ\u001a4tKR|F%Z9\u0015\u0007=\fy\u0005C\u0005t\u0003\u0013\n\t\u00111\u0001\u0002D!A\u00111\u000b\u0001!B\u0013\t\u0019%\u0001\bdkJ\u0014XM\u001c;PM\u001a\u001cX\r\u001e\u0011)\u0011\u0005E\u0013qKA5\u0003W\u0002B!!\u0017\u0002f5\u0011\u00111\f\u0006\u0005\u0003G\tiF\u0003\u0003\u0002`\u0005\u0005\u0014AC1o]>$\u0018\r^5p]*\u0011\u00111M\u0001\u0006U\u00064\u0018\r_\u0005\u0005\u0003O\nYFA\u0005Hk\u0006\u0014H-\u001a3Cs\u0006)a/\u00197vK\u0006\u0012\u0011QN\u0001\u0005i\"L7\u000fC\u0005\u0002r\u0001\u0001\r\u0011\"\u0003\u0002B\u0005\u0019B.Y:u\u001f\u001a47/\u001a;D_6l\u0017\u000e\u001e;fI\"I\u0011Q\u000f\u0001A\u0002\u0013%\u0011qO\u0001\u0018Y\u0006\u001cHo\u00144gg\u0016$8i\\7nSR$X\rZ0%KF$2a\\A=\u0011%\u0019\u00181OA\u0001\u0002\u0004\t\u0019\u0005\u0003\u0005\u0002~\u0001\u0001\u000b\u0015BA\"\u0003Qa\u0017m\u001d;PM\u001a\u001cX\r^\"p[6LG\u000f^3eA!B\u00111PA,\u0003S\nY\u0007\u0003\u0006F\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u0007+\"!!\"\u0011\u0007\u0005\u000b9)C\u0002\u0002\n\n\u0013!\"T9ui\u000ec\u0017.\u001a8u\u0011-\ti\t\u0001a\u0001\u0002\u0004%I!a$\u0002\u0015\rd\u0017.\u001a8u?\u0012*\u0017\u000fF\u0002p\u0003#C\u0011b]AF\u0003\u0003\u0005\r!!\"\t\u0011\u0005U\u0005\u0001)Q\u0005\u0003\u000b\u000bqa\u00197jK:$\b\u0005\u0003\u0005\u0002\u001a\u0002!\tAAA!\u0003A9W\r^\"veJ,g\u000e^(gMN,G\u000fC\u0004\u0002\u001e\u0002!I!a(\u0002\u0015%t\u0017\u000e^5bY&TX\rF\u0001p\u0011\u001d\t\u0019\u000b\u0001C!\u0003K\u000bab]3u\u001f\u001a47/\u001a;SC:<W\rF\u0003p\u0003O\u000b9\f\u0003\u0005\u0002*\u0006\u0005\u0006\u0019AAV\u0003\u0015\u0019H/\u0019:u!\u0015\ti+a-j\u001b\t\tyKC\u0002\u00022R\tA!\u001e;jY&!\u0011QWAX\u0005!y\u0005\u000f^5p]\u0006d\u0007\u0002CA]\u0003C\u0003\r!a+\u0002\u0007\u0015tG\rC\u0004\u0002>\u0002!\t%a0\u0002\u001d\u001d,Go\u0015;beR|eMZ:fiR\t\u0011\u000eC\u0004\u0002D\u0002!\t%a0\u0002\u0019\u001d,G/\u00128e\u001f\u001a47/\u001a;\t\u000f\u0005\u001d\u0007\u0001\"\u0011\u0002J\u0006\tB-Z:fe&\fG.\u001b>f\u001f\u001a47/\u001a;\u0015\u0007%\fY\rC\u0004\u0002N\u0006\u0015\u0007\u0019\u0001\u001b\u0002\t)\u001cxN\u001c\u0005\b\u0003#\u0004A\u0011IAj\u0003)\u0011X-\u00193TG\",W.\u0019\u000b\u0003\u0003+\u0004B!a6\u0002^6\u0011\u0011\u0011\u001c\u0006\u0004\u00037\f\u0013!\u0002;za\u0016\u001c\u0018\u0002BAp\u00033\u0014!b\u0015;sk\u000e$H+\u001f9f\u0011\u001d\t\u0019\u000f\u0001C!\u0003K\f\u0011d\u0019:fCR,G)\u0019;b%\u0016\fG-\u001a:GC\u000e$xN]5fgR\u0011\u0011q\u001d\t\u0007\u0003[\u000bI/!<\n\t\u0005-\u0018q\u0016\u0002\u0005\u0019&\u001cH\u000f\u0005\u0004\u0002p\u0006E\u0018Q_\u0007\u00027%\u0019\u00111_\u000e\u0003#\u0011\u000bG/\u0019*fC\u0012,'OR1di>\u0014\u0018\u0010\u0005\u0003\u0002x\u0006eX\"A\u0011\n\u0007\u0005m\u0018EA\u0002S_^Dq!a@\u0001\t\u0003\u0012\t!\u0001\u0004d_6l\u0017\u000e\u001e\u000b\u0004_\n\r\u0001bBA]\u0003{\u0004\r!\u001b\u0005\b\u0005\u000f\u0001A\u0011IAP\u0003\u0011\u0019Ho\u001c9\t\u000f\t-\u0001\u0001\"\u0011\u0003\u000e\u0005AAo\\*ue&tw\rF\u00015\u0001")
public class MQTTStreamSource
implements MicroBatchReader,
Logging {
    private final String brokerUrl;
    private final MqttClientPersistence persistence;
    private final String topic;
    private final String clientId;
    private final MqttConnectOptions mqttConnectOptions;
    private final int qos;
    private Offset startOffset;
    private Offset endOffset;
    private final int backLog;
    private final LocalMessageStore store;
    private final TrieMap<Object, MQTTMessage> org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages;
    @GuardedBy(value="this")
    private LongOffset org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset;
    @GuardedBy(value="this")
    private LongOffset lastOffsetCommitted;
    private MqttClient client;
    private final Logger log;

    @Override
    public final Logger log() {
        return this.log;
    }

    @Override
    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger x$1) {
        this.log = x$1;
    }

    private Offset startOffset() {
        return this.startOffset;
    }

    private void startOffset_$eq(Offset x$1) {
        this.startOffset = x$1;
    }

    private Offset endOffset() {
        return this.endOffset;
    }

    private void endOffset_$eq(Offset x$1) {
        this.endOffset = x$1;
    }

    public int backLog() {
        return this.backLog;
    }

    private LocalMessageStore store() {
        return this.store;
    }

    public TrieMap<Object, MQTTMessage> org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages() {
        return this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages;
    }

    public LongOffset org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset() {
        return this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset;
    }

    public void org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset_$eq(LongOffset x$1) {
        this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset = x$1;
    }

    private LongOffset lastOffsetCommitted() {
        return this.lastOffsetCommitted;
    }

    private void lastOffsetCommitted_$eq(LongOffset x$1) {
        this.lastOffsetCommitted = x$1;
    }

    private MqttClient client() {
        return this.client;
    }

    private void client_$eq(MqttClient x$1) {
        this.client = x$1;
    }

    public LongOffset getCurrentOffset() {
        return this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset();
    }

    private void initialize() {
        this.client_$eq(new MqttClient(this.brokerUrl, this.clientId, this.persistence));
        MqttCallbackExtended callback = new MqttCallbackExtended(this){
            private final /* synthetic */ MQTTStreamSource $outer;

            public synchronized void messageArrived(String topic_, MqttMessage message) {
                MQTTMessage mqttMessage = new MQTTMessage(message, topic_);
                long offset = this.$outer.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset().offset() + 1L;
                this.$outer.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages().put((Object)BoxesRunTime.boxToLong((long)offset), (Object)mqttMessage);
                this.$outer.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset_$eq(new LongOffset(offset));
                this.$outer.log().trace(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Message arrived, ", " ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic_, mqttMessage})));
            }

            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            public void connectionLost(Throwable cause) {
                this.$outer.log().warn("Connection to mqtt server lost.", cause);
            }

            public void connectComplete(boolean reconnect, String serverURI) {
                this.$outer.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Connect complete ", ". Is it a reconnect?: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{serverURI, BoxesRunTime.boxToBoolean((boolean)reconnect)})));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        };
        this.client().setCallback((MqttCallback)callback);
        this.client().connect(this.mqttConnectOptions);
        this.client().subscribe(this.topic, this.qos);
    }

    public synchronized void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
        this.startOffset_$eq(start.orElse(new LongOffset(-1L)));
        this.endOffset_$eq(end.orElse(this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset()));
    }

    public Offset getStartOffset() {
        return (Offset)Option$.MODULE$.apply((Object)this.startOffset()).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new IllegalStateException("start offset not set");
            }
        });
    }

    public Offset getEndOffset() {
        return (Offset)Option$.MODULE$.apply((Object)this.endOffset()).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new IllegalStateException("end offset not set");
            }
        });
    }

    public Offset deserializeOffset(String json) {
        return new LongOffset(new StringOps(Predef$.MODULE$.augmentString(json)).toLong());
    }

    public StructType readSchema() {
        return MQTTStreamConstants$.MODULE$.SCHEMA_DEFAULT();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<DataReaderFactory<Row>> createDataReaderFactories() {
        Object object;
        MQTTStreamSource mQTTStreamSource = this;
        synchronized (mQTTStreamSource) {
            long sliceStart = ((LongOffset)((Object)LongOffset$.MODULE$.convert((org.apache.spark.sql.execution.streaming.Offset)this.startOffset()).get())).offset() + 1L;
            long sliceEnd = ((LongOffset)((Object)LongOffset$.MODULE$.convert((org.apache.spark.sql.execution.streaming.Offset)this.endOffset()).get())).offset() + 1L;
            object = new RichLong(Predef$.MODULE$.longWrapper(sliceStart)).until((Object)BoxesRunTime.boxToLong((long)sliceEnd)).map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ MQTTStreamSource $outer;

                public final MQTTMessage apply(long i) {
                    return (MQTTMessage)this.$outer.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages().apply((Object)BoxesRunTime.boxToLong((long)i));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            }, IndexedSeq$.MODULE$.canBuildFrom());
        }
        IndexedSeq rawList = (IndexedSeq)object;
        SparkSession spark = (SparkSession)SparkSession$.MODULE$.getActiveSession().get();
        int numPartitions = spark.sparkContext().defaultParallelism();
        ListBuffer[] slices = (ListBuffer[])Array$.MODULE$.fill(numPartitions, (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final ListBuffer<MQTTMessage> apply() {
                return new ListBuffer();
            }
        }, ClassTag$.MODULE$.apply(ListBuffer.class));
        ((IterableLike)rawList.zipWithIndex(IndexedSeq$.MODULE$.canBuildFrom())).foreach((Function1)new Serializable(this, numPartitions, slices){
            public static final long serialVersionUID = 0L;
            private final int numPartitions$1;
            private final ListBuffer[] slices$1;

            public final void apply(Tuple2<MQTTMessage, Object> x0$1) {
                Tuple2<MQTTMessage, Object> tuple2 = x0$1;
                if (tuple2 != null) {
                    MQTTMessage r = (MQTTMessage)tuple2._1();
                    int idx = tuple2._2$mcI$sp();
                    this.slices$1[idx % this.numPartitions$1].append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new MQTTMessage[]{r}));
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError(tuple2);
            }
            {
                this.numPartitions$1 = numPartitions$1;
                this.slices$1 = slices$1;
            }
        });
        return (List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)((TraversableOnce)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions).map((Function1)new Serializable(this, slices){
            public static final long serialVersionUID = 0L;
            private final ListBuffer[] slices$1;

            public final Object apply(int i) {
                ListBuffer slice = this.slices$1[i];
                return new DataReaderFactory<Row>(this, slice){
                    public final ListBuffer slice$1;

                    public DataReader<Row> createDataReader() {
                        return new DataReader<Row>(this){
                            private int currentIdx;
                            private final /* synthetic */ $anonfun$createDataReaderFactories$2$$anon$3 $outer;

                            private int currentIdx() {
                                return this.currentIdx;
                            }

                            private void currentIdx_$eq(int x$1) {
                                this.currentIdx = x$1;
                            }

                            public boolean next() {
                                this.currentIdx_$eq(this.currentIdx() + 1);
                                return this.currentIdx() < this.$outer.slice$1.size();
                            }

                            public Row get() {
                                return Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)((MQTTMessage)this.$outer.slice$1.apply(this.currentIdx())).id()), ((MQTTMessage)this.$outer.slice$1.apply(this.currentIdx())).topic(), ((MQTTMessage)this.$outer.slice$1.apply(this.currentIdx())).payload(), ((MQTTMessage)this.$outer.slice$1.apply(this.currentIdx())).timestamp()}));
                            }

                            public void close() {
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.currentIdx = -1;
                            }
                        };
                    }
                    {
                        this.slice$1 = slice$1;
                    }
                };
            }
            {
                this.slices$1 = slices$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom())).toList()).asJava();
    }

    public synchronized void commit(Offset end) {
        LongOffset newOffset = (LongOffset)((Object)LongOffset$.MODULE$.convert((org.apache.spark.sql.execution.streaming.Offset)end).getOrElse((Function0)new Serializable(this, end){
            public static final long serialVersionUID = 0L;
            private final Offset end$1;

            public final Nothing$ apply() {
                return package$.MODULE$.error(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"MQTTStreamSource.commit() received an offset (", ") that did not "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.end$1}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"originate with an instance of this class"})).s((Seq)Nil$.MODULE$)).toString());
            }
            {
                this.end$1 = end$1;
            }
        }));
        int offsetDiff = (int)(newOffset.offset() - this.lastOffsetCommitted().offset());
        if (offsetDiff < 0) {
            throw package$.MODULE$.error(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Offsets committed out of order: ", " followed by ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.lastOffsetCommitted(), end})));
        }
        new RichLong(Predef$.MODULE$.longWrapper(this.lastOffsetCommitted().offset())).until((Object)BoxesRunTime.boxToLong((long)newOffset.offset())).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ MQTTStreamSource $outer;

            public final Option<MQTTMessage> apply(long x) {
                return this.$outer.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages().remove((Object)BoxesRunTime.boxToLong((long)(x + 1L)));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.lastOffsetCommitted_$eq(newOffset);
    }

    public synchronized void stop() {
        this.client().disconnect();
        this.persistence.close();
        this.client().close();
    }

    public String toString() {
        return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"MQTTStreamSource[brokerUrl: ", ", topic: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.brokerUrl, this.topic}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" clientId: ", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.clientId}))).toString();
    }

    public MQTTStreamSource(DataSourceOptions options, String brokerUrl, MqttClientPersistence persistence, String topic, String clientId, MqttConnectOptions mqttConnectOptions, int qos) {
        this.brokerUrl = brokerUrl;
        this.persistence = persistence;
        this.topic = topic;
        this.clientId = clientId;
        this.mqttConnectOptions = mqttConnectOptions;
        this.qos = qos;
        Logging$class.$init$(this);
        this.backLog = options.getInt("autopruning.backlog", 500);
        this.store = new LocalMessageStore(persistence);
        this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$messages = new TrieMap();
        this.org$apache$bahir$sql$streaming$mqtt$MQTTStreamSource$$currentOffset = new LongOffset(-1L);
        this.lastOffsetCommitted = new LongOffset(-1L);
        this.initialize();
    }
}

