package com.couchbase.spark.sql.streaming;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.spark.Logging;
import com.couchbase.spark.connection.CouchbaseConfig;
import com.couchbase.spark.connection.CouchbaseConfig$;
import com.couchbase.spark.connection.CouchbaseConnection;
import com.couchbase.spark.connection.CouchbaseConnection$;
import com.couchbase.spark.sql.DefaultSource$;
import com.couchbase.spark.streaming.Deletion;
import com.couchbase.spark.streaming.Mutation;
import com.couchbase.spark.streaming.StreamMessage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameCreation$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import rx.lang.scala.Observable$;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: CouchbaseSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]g\u0001\u0002\u0012$\u00019B\u0001\u0002\u0013\u0001\u0003\u0002\u0003\u0006I!\u0013\u0005\t\u001b\u0002\u0011\t\u0011)A\u0005\u001d\"A!\f\u0001B\u0001B\u0003%1\fC\u0003j\u0001\u0011\u0005!\u000eC\u0004q\u0001\t\u0007I\u0011A9\t\u000f\u0005-\u0001\u0001)A\u0005e\"I\u0011Q\u0002\u0001C\u0002\u0013\u0005\u0011q\u0002\u0005\t\u0003W\u0001\u0001\u0015!\u0003\u0002\u0012!I\u0011Q\u0006\u0001C\u0002\u0013\u0005\u0011q\u0006\u0005\t\u0003\u0007\u0002\u0001\u0015!\u0003\u00022!I\u0011Q\t\u0001C\u0002\u0013\u0005\u0011q\t\u0005\t\u0003+\u0002\u0001\u0015!\u0003\u0002J!I\u0011Q\u0005\u0001C\u0002\u0013\u0005\u0011q\u000b\u0005\t\u0003C\u0002\u0001\u0015!\u0003\u0002Z!I\u00111\r\u0001C\u0002\u0013\u0005\u0011Q\r\u0005\b\u0003O\u0002\u0001\u0015!\u0003U\u0011%\tI\u0007\u0001b\u0001\n\u0013\tY\u0007C\u0004\u0002n\u0001\u0001\u000b\u0011\u00024\t\u0013\u0005=\u0004A1A\u0005\n\u0005-\u0004bBA9\u0001\u0001\u0006IA\u001a\u0005\n\u0003g\u0002!\u0019!C\u0005\u0003WBq!!\u001e\u0001A\u0003%a\rC\u0004\u0002x\u0001!I!!\u001f\t\u000f\u0005\u0005\u0005\u0001\"\u0003\u0002\u0004\"9\u0011Q\u0011\u0001\u0005B\u0005\u0015\u0004bBAD\u0001\u0011\u0005\u0013\u0011\u0012\u0005\b\u0003\u001b\u0003A\u0011IAH\u0011\u001d\t9\f\u0001C\u0005\u0003sCq!!2\u0001\t\u0003\nIhB\u0004\u0002H\u000eB\t!!3\u0007\r\t\u001a\u0003\u0012AAf\u0011\u0019Iw\u0004\"\u0001\u0002T\"9\u0011Q[\u0010\u0005\u0002\u0005\u0015$aD\"pk\u000eD'-Y:f'>,(oY3\u000b\u0005\u0011*\u0013!C:ue\u0016\fW.\u001b8h\u0015\t1s%A\u0002tc2T!\u0001K\u0015\u0002\u000bM\u0004\u0018M]6\u000b\u0005)Z\u0013!C2pk\u000eD'-Y:f\u0015\u0005a\u0013aA2p[\u000e\u00011\u0003\u0002\u00010o\u0011\u0003\"\u0001M\u001b\u000e\u0003ER!AM\u001a\u0002\t1\fgn\u001a\u0006\u0002i\u0005!!.\u0019<b\u0013\t1\u0014G\u0001\u0004PE*,7\r\u001e\t\u0003q\tk\u0011!\u000f\u0006\u0003IiR!a\u000f\u001f\u0002\u0013\u0015DXmY;uS>t'B\u0001\u0014>\u0015\tAcH\u0003\u0002@\u0001\u00061\u0011\r]1dQ\u0016T\u0011!Q\u0001\u0004_J<\u0017BA\":\u0005\u0019\u0019v.\u001e:dKB\u0011QIR\u0007\u0002O%\u0011qi\n\u0002\b\u0019><w-\u001b8h\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010\u001e\t\u0003\u0015.k\u0011\u0001P\u0005\u0003\u0019r\u0012!bU)M\u0007>tG/\u001a=u\u0003))8/\u001a:TG\",W.\u0019\t\u0004\u001fJ#V\"\u0001)\u000b\u0003E\u000bQa]2bY\u0006L!a\u0015)\u0003\r=\u0003H/[8o!\t)\u0006,D\u0001W\u0015\t9F(A\u0003usB,7/\u0003\u0002Z-\nQ1\u000b\u001e:vGR$\u0016\u0010]3\u0002\u0015A\f'/Y7fi\u0016\u00148\u000f\u0005\u0003]G\u001a4gBA/b!\tq\u0006+D\u0001`\u0015\t\u0001W&\u0001\u0004=e>|GOP\u0005\u0003EB\u000ba\u0001\u0015:fI\u00164\u0017B\u00013f\u0005\ri\u0015\r\u001d\u0006\u0003EB\u0003\"\u0001X4\n\u0005!,'AB*ue&tw-\u0001\u0004=S:LGO\u0010\u000b\u0005W6tw\u000e\u0005\u0002m\u00015\t1\u0005C\u0003I\t\u0001\u0007\u0011\nC\u0003N\t\u0001\u0007a\nC\u0003[\t\u0001\u00071,\u0001\u0004rk\u0016,Xm]\u000b\u0002eB!1\u000f\u001f>~\u001b\u0005!(BA;w\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003oN\nA!\u001e;jY&\u0011\u0011\u0010\u001e\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bCA(|\u0013\ta\bKA\u0003TQ>\u0014H\u000f\u0005\u0003t}\u0006\u0005\u0011BA@u\u0005U\u0019uN\\2veJ,g\u000e\u001e'j].,G-U;fk\u0016\u0004B!a\u0001\u0002\b5\u0011\u0011Q\u0001\u0006\u0003I\u001dJA!!\u0003\u0002\u0006\ti1\u000b\u001e:fC6lUm]:bO\u0016\fq!];fk\u0016\u001c\b%A\bgY><8i\u001c8ue>dG.\u001a:t+\t\t\t\u0002E\u0003tqj\f\u0019\u0002\u0005\u0003\u0002\u0016\u0005\u001dRBAA\f\u0015\u0011\tI\"a\u0007\u0002\u000b9,G\u000f^=\u000b\t\u0005u\u0011qD\u0001\niJ\fgn\u001d9peRTA!!\t\u0002$\u0005\u0019Am\u00199\u000b\u0007\u0005\u0015\u0012&\u0001\u0004dY&,g\u000e^\u0005\u0005\u0003S\t9BA\u000bDQ\u0006tg.\u001a7GY><8i\u001c8ue>dG.\u001a:\u0002!\u0019dwn^\"p]R\u0014x\u000e\u001c7feN\u0004\u0013!D2veJ,g\u000e^(gMN,G/\u0006\u0002\u00022A1\u00111GA\u001d\u0003{i!!!\u000e\u000b\u0007\u0005]B/\u0001\u0004bi>l\u0017nY\u0005\u0005\u0003w\t)DA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\rA\u0014qH\u0005\u0004\u0003\u0003J$AB(gMN,G/\u0001\bdkJ\u0014XM\u001c;PM\u001a\u001cX\r\u001e\u0011\u0002\r\r|gNZ5h+\t\tI\u0005\u0005\u0003\u0002L\u0005ESBAA'\u0015\r\tyeJ\u0001\u000bG>tg.Z2uS>t\u0017\u0002BA*\u0003\u001b\u0012qbQ8vG\"\u0014\u0017m]3D_:4\u0017nZ\u0001\bG>tg-[4!+\t\tI\u0006\u0005\u0003\u0002\\\u0005uSBAA\u0010\u0013\u0011\ty&a\b\u0003\r\rc\u0017.\u001a8u\u0003\u001d\u0019G.[3oi\u0002\n!\"^:fIN\u001b\u0007.Z7b+\u0005!\u0016aC;tK\u0012\u001c6\r[3nC\u0002\n1\"\u001b3GS\u0016dGMT1nKV\ta-\u0001\u0007jI\u001aKW\r\u001c3OC6,\u0007%\u0001\u0006tiJ,\u0017-\u001c$s_6\f1b\u001d;sK\u0006lgI]8nA\u0005A1\u000f\u001e:fC6$v.A\u0005tiJ,\u0017-\u001c+pA\u0005Q\u0011N\\5uS\u0006d\u0017N_3\u0015\u0005\u0005m\u0004cA(\u0002~%\u0019\u0011q\u0010)\u0003\tUs\u0017\u000e^\u0001\u0010gR\fG/Z:U_>3gm]3ugV\u0011\u0011QH\u0001\u0007g\u000eDW-\\1\u0002\u0013\u001d,Go\u00144gg\u0016$XCAAF!\u0011y%+!\u0010\u0002\u0011\u001d,GOQ1uG\"$b!!%\u00020\u0006M\u0006\u0003BAJ\u0003SsA!!&\u0002&:!\u0011qSAR\u001d\u0011\tI*!)\u000f\t\u0005m\u0015q\u0014\b\u0004=\u0006u\u0015\"A!\n\u0005}\u0002\u0015B\u0001\u0015?\u0013\t1S(C\u0002\u0002(r\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002,\u00065&!\u0003#bi\u00064%/Y7f\u0015\r\t9\u000b\u0010\u0005\b\u0003c[\u0002\u0019AAF\u0003\u0015\u0019H/\u0019:u\u0011\u001d\t)l\u0007a\u0001\u0003{\t1!\u001a8e\u0003\r\t7m\u001b\u000b\u0005\u0003w\nY\fC\u0004\u0002>r\u0001\r!a0\u0002\u00035\u0004B!a\u0001\u0002B&!\u00111YA\u0003\u0005!iU\u000f^1uS>t\u0017\u0001B:u_B\fqbQ8vG\"\u0014\u0017m]3T_V\u00148-\u001a\t\u0003Y~\u00192aHAg!\ry\u0015qZ\u0005\u0004\u0003#\u0004&AB!osJ+g\r\u0006\u0002\u0002J\u0006qA)\u0012$B+2#vlU\"I\u000b6\u000b\u0005")
/* loaded from: input_file:com/couchbase/spark/sql/streaming/CouchbaseSource.class */
public class CouchbaseSource implements Source, Logging {
    private final SQLContext sqlContext;
    private final ConcurrentHashMap<Object, ConcurrentLinkedQueue<StreamMessage>> queues;
    private final ConcurrentHashMap<Object, ChannelFlowController> flowControllers;
    private final AtomicReference<Offset> currentOffset;
    private final CouchbaseConfig config;
    private final Client client;
    private final StructType usedSchema;
    private final String idFieldName;
    private final String streamFrom;
    private final String streamTo;
    private transient Logger com$couchbase$spark$Logging$$log_;

    public static StructType DEFAULT_SCHEMA() {
        return CouchbaseSource$.MODULE$.DEFAULT_SCHEMA();
    }

    @Override // com.couchbase.spark.Logging
    public String logName() {
        String logName;
        logName = logName();
        return logName;
    }

    @Override // com.couchbase.spark.Logging
    public Logger log() {
        Logger log;
        log = log();
        return log;
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // com.couchbase.spark.Logging
    public void initializeLogIfNecessary(boolean z) {
        initializeLogIfNecessary(z);
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getCouchbaseClassLoader() {
        ClassLoader couchbaseClassLoader;
        couchbaseClassLoader = getCouchbaseClassLoader();
        return couchbaseClassLoader;
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getContextOrCouchbaseClassLoader() {
        ClassLoader contextOrCouchbaseClassLoader;
        contextOrCouchbaseClassLoader = getContextOrCouchbaseClassLoader();
        return contextOrCouchbaseClassLoader;
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    @Override // com.couchbase.spark.Logging
    public Logger com$couchbase$spark$Logging$$log_() {
        return this.com$couchbase$spark$Logging$$log_;
    }

    @Override // com.couchbase.spark.Logging
    public void com$couchbase$spark$Logging$$log__$eq(Logger logger) {
        this.com$couchbase$spark$Logging$$log_ = logger;
    }

    public ConcurrentHashMap<Object, ConcurrentLinkedQueue<StreamMessage>> queues() {
        return this.queues;
    }

    public ConcurrentHashMap<Object, ChannelFlowController> flowControllers() {
        return this.flowControllers;
    }

    public AtomicReference<Offset> currentOffset() {
        return this.currentOffset;
    }

    public CouchbaseConfig config() {
        return this.config;
    }

    public Client client() {
        return this.client;
    }

    public StructType usedSchema() {
        return this.usedSchema;
    }

    private String idFieldName() {
        return this.idFieldName;
    }

    private String streamFrom() {
        return this.streamFrom;
    }

    private String streamTo() {
        return this.streamTo;
    }

    private synchronized void initialize() {
        StreamFrom streamFrom;
        StreamTo streamTo;
        final CouchbaseSource couchbaseSource = null;
        client().controlEventHandler(new ControlEventHandler(couchbaseSource) { // from class: com.couchbase.spark.sql.streaming.CouchbaseSource$$anon$1
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (DcpSnapshotMarkerRequest.is(byteBuf)) {
                    channelFlowController.ack(byteBuf);
                }
                byteBuf.release();
            }
        });
        client().dataEventHandler(new DataEventHandler(this) { // from class: com.couchbase.spark.sql.streaming.CouchbaseSource$$anon$2
            private final /* synthetic */ CouchbaseSource $outer;

            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                short partition;
                StreamMessage streamMessage;
                if (DcpMutationMessage.is(byteBuf)) {
                    byte[] bArr = new byte[DcpMutationMessage.content(byteBuf).readableBytes()];
                    DcpMutationMessage.content(byteBuf).readBytes(bArr);
                    byte[] bArr2 = new byte[DcpMutationMessage.key(byteBuf).readableBytes()];
                    DcpMutationMessage.key(byteBuf).readBytes(bArr2);
                    partition = DcpMutationMessage.partition(byteBuf);
                    streamMessage = new Mutation(bArr2, bArr, Predef$.MODULE$.int2Integer(DcpMutationMessage.expiry(byteBuf)), DcpMutationMessage.cas(byteBuf), DcpMutationMessage.partition(byteBuf), DcpMutationMessage.flags(byteBuf), DcpMutationMessage.lockTime(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                } else {
                    if (!DcpDeletionMessage.is(byteBuf)) {
                        channelFlowController.ack(byteBuf);
                        throw new IllegalStateException(new StringBuilder(30).append("Got unexpected DCP Data Event ").append(MessageUtil.humanize(byteBuf)).toString());
                    }
                    byte[] bArr3 = new byte[DcpDeletionMessage.key(byteBuf).readableBytes()];
                    DcpDeletionMessage.key(byteBuf).readBytes(bArr3);
                    partition = DcpMutationMessage.partition(byteBuf);
                    Deletion deletion = new Deletion(bArr3, DcpDeletionMessage.cas(byteBuf), DcpDeletionMessage.partition(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                    channelFlowController.ack(byteBuf);
                    streamMessage = deletion;
                }
                StreamMessage streamMessage2 = streamMessage;
                if (this.$outer.flowControllers().containsKey(BoxesRunTime.boxToInteger(partition))) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    this.$outer.flowControllers().put(BoxesRunTime.boxToShort(partition), channelFlowController);
                }
                this.$outer.queues().get(BoxesRunTime.boxToShort(partition)).add(streamMessage2);
                byteBuf.release();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        client().connect().await();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), client().numPartitions()).foreach(obj -> {
            return $anonfun$initialize$1(this, BoxesRunTime.unboxToInt(obj));
        });
        String lowerCase = streamFrom().toLowerCase();
        if ("beginning".equals(lowerCase)) {
            streamFrom = StreamFrom.BEGINNING;
        } else {
            if (!"now".equals(lowerCase)) {
                throw new IllegalArgumentException(new StringBuilder(27).append("Could not match StreamFrom ").append(streamFrom()).toString());
            }
            streamFrom = StreamFrom.NOW;
        }
        StreamFrom streamFrom2 = streamFrom;
        String lowerCase2 = streamTo().toLowerCase();
        if ("infinity".equals(lowerCase2)) {
            streamTo = StreamTo.INFINITY;
        } else {
            if (!"now".equals(lowerCase2)) {
                throw new IllegalArgumentException(new StringBuilder(25).append("Could not match StreamTo ").append(streamFrom()).toString());
            }
            streamTo = StreamTo.NOW;
        }
        StreamTo streamTo2 = streamTo;
        logInfo(() -> {
            return new StringBuilder(46).append("Starting Couchbase Structured Stream from ").append(streamFrom2).append(" to ").append(streamTo2).toString();
        });
        client().initializeState(streamFrom2, streamTo2).await();
        currentOffset().set(statesToOffsets());
        client().startStreaming(new Short[0]).await();
        Observable$.MODULE$.interval(new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds()).map(obj2 -> {
            BoxesRunTime.unboxToLong(obj2);
            return this.statesToOffsets();
        }).foreach(offset -> {
            $anonfun$initialize$4(this, offset);
            return BoxedUnit.UNIT;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Offset statesToOffsets() {
        SessionState sessionState = client().sessionState();
        return new CouchbaseSourceOffset(((IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), client().numPartitions()).map(obj -> {
            return $anonfun$statesToOffsets$1(sessionState, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public StructType schema() {
        return usedSchema();
    }

    public Option<Offset> getOffset() {
        return Option$.MODULE$.apply(currentOffset().get());
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        int i;
        logInfo(() -> {
            return new StringBuilder(37).append("GetBatch called with start = ").append(option).append(", end = ").append(offset).toString();
        });
        Option map = option.map(offset2 -> {
            return CouchbaseSourceOffset$.MODULE$.convertToCouchbaseSourceOffset(offset2);
        });
        CouchbaseSourceOffset convertToCouchbaseSourceOffset = CouchbaseSourceOffset$.MODULE$.convertToCouchbaseSourceOffset(offset);
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        convertToCouchbaseSourceOffset.partitionToOffsets().foreach(tuple2 -> {
            $anonfun$getBatch$3(this, map, apply, tuple2);
            return BoxedUnit.UNIT;
        });
        try {
            i = usedSchema().fieldIndex(idFieldName());
        } catch (IllegalArgumentException unused) {
            i = -1;
        }
        int i2 = i;
        StructType usedSchema = usedSchema();
        StructType DEFAULT_SCHEMA = CouchbaseSource$.MODULE$.DEFAULT_SCHEMA();
        if (usedSchema != null ? usedSchema.equals(DEFAULT_SCHEMA) : DEFAULT_SCHEMA == null) {
            ArrayBuffer arrayBuffer = (ArrayBuffer) apply.map(tuple22 -> {
                return Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{new String((byte[]) tuple22._1(), CharsetUtil.UTF_8), tuple22._2()}));
            }, ArrayBuffer$.MODULE$.canBuildFrom());
            SparkContext sparkContext = this.sqlContext.sparkContext();
            return DataFrameCreation$.MODULE$.createStreamingDataFrame(this.sqlContext, sparkContext.parallelize(arrayBuffer, sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(Row.class)), usedSchema());
        }
        SparkContext sparkContext2 = this.sqlContext.sparkContext();
        return DataFrameCreation$.MODULE$.createStreamingDataFrame(this.sqlContext, this.sqlContext.read().schema(usedSchema()).json(this.sqlContext.sparkSession().createDataset(sparkContext2.parallelize((Seq) apply.map(tuple23 -> {
            return i2 >= 0 ? JsonObject.fromJson(new String((byte[]) tuple23._2(), CharsetUtil.UTF_8)).put(this.idFieldName(), new String((byte[]) tuple23._1(), CharsetUtil.UTF_8)).toString() : new String((byte[]) tuple23._2(), CharsetUtil.UTF_8);
        }, ArrayBuffer$.MODULE$.canBuildFrom()), sparkContext2.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)), Encoders$.MODULE$.STRING())), usedSchema());
    }

    private void ack(Mutation mutation) {
        flowControllers().get(BoxesRunTime.boxToShort(mutation.partition())).ack(mutation.ackBytes());
    }

    public void stop() {
        client().disconnect().await();
    }

    public static final /* synthetic */ ConcurrentLinkedQueue $anonfun$initialize$1(CouchbaseSource couchbaseSource, int i) {
        return couchbaseSource.queues().put(BoxesRunTime.boxToShort((short) i), new ConcurrentLinkedQueue<>());
    }

    public static final /* synthetic */ void $anonfun$initialize$4(CouchbaseSource couchbaseSource, Offset offset) {
        couchbaseSource.currentOffset().set(offset);
    }

    public static final /* synthetic */ Tuple2 $anonfun$statesToOffsets$1(SessionState sessionState, int i) {
        return new Tuple2(BoxesRunTime.boxToShort((short) i), BoxesRunTime.boxToLong(sessionState.get(i).getStartSeqno()));
    }

    public static final /* synthetic */ long $anonfun$getBatch$4(short s, CouchbaseSourceOffset couchbaseSourceOffset) {
        return BoxesRunTime.unboxToLong(couchbaseSourceOffset.partitionToOffsets().apply(BoxesRunTime.boxToShort(s)));
    }

    public static final /* synthetic */ void $anonfun$getBatch$3(CouchbaseSource couchbaseSource, Option option, ArrayBuffer arrayBuffer, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        short unboxToShort = BoxesRunTime.unboxToShort(tuple2._1());
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        ConcurrentLinkedQueue<StreamMessage> concurrentLinkedQueue = couchbaseSource.queues().get(BoxesRunTime.boxToShort(unboxToShort));
        boolean z = true;
        while (z) {
            StreamMessage peek = concurrentLinkedQueue.peek();
            if (peek == null) {
                z = false;
            } else if (peek instanceof Mutation) {
                Mutation mutation = (Mutation) peek;
                long unboxToLong = BoxesRunTime.unboxToLong(option.map(couchbaseSourceOffset -> {
                    return BoxesRunTime.boxToLong($anonfun$getBatch$4(unboxToShort, couchbaseSourceOffset));
                }).getOrElse(() -> {
                    return 0L;
                }));
                if (mutation.bySeqno() < unboxToLong) {
                    concurrentLinkedQueue.remove();
                    couchbaseSource.ack(mutation);
                    boxedUnit = BoxedUnit.UNIT;
                } else if (mutation.bySeqno() > _2$mcJ$sp || mutation.bySeqno() < unboxToLong) {
                    z = false;
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    concurrentLinkedQueue.remove();
                    couchbaseSource.ack(mutation);
                    arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(mutation.key(), mutation.content())}));
                    boxedUnit = BoxedUnit.UNIT;
                }
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    public CouchbaseSource(SQLContext sQLContext, Option<StructType> option, Map<String, String> map) {
        this.sqlContext = sQLContext;
        Source.$init$(this);
        com$couchbase$spark$Logging$$log__$eq(null);
        this.queues = new ConcurrentHashMap<>(1024);
        this.flowControllers = new ConcurrentHashMap<>(1024);
        this.currentOffset = new AtomicReference<>();
        this.config = CouchbaseConfig$.MODULE$.apply(sQLContext.sparkContext().getConf());
        CouchbaseConnection apply = CouchbaseConnection$.MODULE$.apply();
        this.client = apply.streamClient(config(), apply.streamClient$default$2());
        this.usedSchema = (StructType) option.getOrElse(() -> {
            return CouchbaseSource$.MODULE$.DEFAULT_SCHEMA();
        });
        this.idFieldName = (String) map.getOrElse("idField", () -> {
            return DefaultSource$.MODULE$.DEFAULT_DOCUMENT_ID_FIELD();
        });
        this.streamFrom = (String) map.getOrElse("streamFrom", () -> {
            return DefaultSource$.MODULE$.DEFAULT_STREAM_FROM();
        });
        this.streamTo = (String) map.getOrElse("streamTo", () -> {
            return DefaultSource$.MODULE$.DEFAULT_STREAM_TO();
        });
        initialize();
    }
}
