package com.couchbase.spark.sql.streaming;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.spark.Logging;
import com.couchbase.spark.connection.CouchbaseConfig;
import com.couchbase.spark.connection.CouchbaseConfig$;
import com.couchbase.spark.connection.CouchbaseConnection;
import com.couchbase.spark.connection.CouchbaseConnection$;
import com.couchbase.spark.streaming.Deletion;
import com.couchbase.spark.streaming.Mutation;
import com.couchbase.spark.streaming.StreamMessage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.streaming.CompositeOffset;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import rx.lang.scala.Observable$;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.TraversableLike;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.TraitSetter;

/* compiled from: CouchbaseSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]f\u0001B\u0001\u0003\u00015\u0011qbQ8vG\"\u0014\u0017m]3T_V\u00148-\u001a\u0006\u0003\u0007\u0011\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u00151\u0011aA:rY*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\t\u0011bY8vG\"\u0014\u0017m]3\u000b\u0003-\t1aY8n\u0007\u0001\u0019B\u0001\u0001\b\u0015CA\u0011qBE\u0007\u0002!)\t\u0011#A\u0003tG\u0006d\u0017-\u0003\u0002\u0014!\t1\u0011I\\=SK\u001a\u0004\"!F\u0010\u000e\u0003YQ!aA\f\u000b\u0005aI\u0012!C3yK\u000e,H/[8o\u0015\t)!D\u0003\u0002\b7)\u0011A$H\u0001\u0007CB\f7\r[3\u000b\u0003y\t1a\u001c:h\u0013\t\u0001cC\u0001\u0004T_V\u00148-\u001a\t\u0003E\rj\u0011AB\u0005\u0003I\u0019\u0011q\u0001T8hO&tw\r\u0003\u0005'\u0001\t\u0005\t\u0015!\u0003(\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010\u001e\t\u0003Q%j\u0011!G\u0005\u0003Ue\u0011!bU)M\u0007>tG/\u001a=u\u0011!a\u0003A!A!\u0002\u0013i\u0013aC;tKJ|6o\u00195f[\u0006\u00042a\u0004\u00181\u0013\ty\u0003C\u0001\u0004PaRLwN\u001c\t\u0003cQj\u0011A\r\u0006\u0003ge\tQ\u0001^=qKNL!!\u000e\u001a\u0003\u0015M#(/^2u)f\u0004X\r\u0003\u00058\u0001\t\u0005\t\u0015!\u00039\u0003)\u0001\u0018M]1nKR,'o\u001d\t\u0005sqztH\u0004\u0002\u0010u%\u00111\bE\u0001\u0007!J,G-\u001a4\n\u0005ur$aA'ba*\u00111\b\u0005\t\u0003s\u0001K!!\u0011 \u0003\rM#(/\u001b8h\u0011\u0015\u0019\u0005\u0001\"\u0001E\u0003\u0019a\u0014N\\5u}Q!Qi\u0012%J!\t1\u0005!D\u0001\u0003\u0011\u00151#\t1\u0001(\u0011\u0015a#\t1\u0001.\u0011\u00159$\t1\u00019\u0011\u001dY\u0005A1A\u0005\u00021\u000ba!];fk\u0016\u001cX#A'\u0011\t9+vKW\u0007\u0002\u001f*\u0011\u0001+U\u0001\u000bG>t7-\u001e:sK:$(B\u0001*T\u0003\u0011)H/\u001b7\u000b\u0003Q\u000bAA[1wC&\u0011ak\u0014\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bCA\bY\u0013\tI\u0006CA\u0003TQ>\u0014H\u000fE\u0002O7vK!\u0001X(\u0003+\r{gnY;se\u0016tG\u000fT5oW\u0016$\u0017+^3vKB\u0011a\fY\u0007\u0002?*\u00111AB\u0005\u0003C~\u0013Qb\u0015;sK\u0006lW*Z:tC\u001e,\u0007BB2\u0001A\u0003%Q*A\u0004rk\u0016,Xm\u001d\u0011\t\u000f\u0015\u0004!\u0019!C\u0001M\u0006i1-\u001e:sK:$xJ\u001a4tKR,\u0012a\u001a\t\u0004Q.lW\"A5\u000b\u0005)|\u0015AB1u_6L7-\u0003\u0002mS\ny\u0011\t^8nS\u000e\u0014VMZ3sK:\u001cW\r\u0005\u0002\u0016]&\u0011qN\u0006\u0002\u0007\u001f\u001a47/\u001a;\t\rE\u0004\u0001\u0015!\u0003h\u00039\u0019WO\u001d:f]R|eMZ:fi\u0002Bqa\u001d\u0001C\u0002\u0013\u0005A/\u0001\u0004d_:4\u0017nZ\u000b\u0002kB\u0011a/_\u0007\u0002o*\u0011\u0001PB\u0001\u000bG>tg.Z2uS>t\u0017B\u0001>x\u0005=\u0019u.^2iE\u0006\u001cXmQ8oM&<\u0007B\u0002?\u0001A\u0003%Q/A\u0004d_:4\u0017n\u001a\u0011\t\u000fy\u0004!\u0019!C\u0001\u007f\u000611\r\\5f]R,\"!!\u0001\u0011\t\u0005\r\u00111B\u0007\u0003\u0003\u000bQA!a\u0002\u0002\n\u0005\u0019Am\u00199\u000b\u0005yD\u0011\u0002BA\u0007\u0003\u000b\u0011aa\u00117jK:$\b\u0002CA\t\u0001\u0001\u0006I!!\u0001\u0002\u000f\rd\u0017.\u001a8uA!I\u0011Q\u0003\u0001C\u0002\u0013\u0005\u0011qC\u0001\fkN,GmX:dQ\u0016l\u0017-F\u00011\u0011\u001d\tY\u0002\u0001Q\u0001\nA\nA\"^:fI~\u001b8\r[3nC\u0002B\u0011\"a\b\u0001\u0005\u0004%I!!\t\u0002\u0017%$g)[3mI:\u000bW.Z\u000b\u0002\u007f!9\u0011Q\u0005\u0001!\u0002\u0013y\u0014\u0001D5e\r&,G\u000e\u001a(b[\u0016\u0004\u0003\"CA\u0015\u0001\t\u0007I\u0011BA\u0011\u0003)\u0019HO]3b[\u001a\u0013x.\u001c\u0005\b\u0003[\u0001\u0001\u0015!\u0003@\u0003-\u0019HO]3b[\u001a\u0013x.\u001c\u0011\t\u0013\u0005E\u0002A1A\u0005\n\u0005\u0005\u0012\u0001C:ue\u0016\fW\u000eV8\t\u000f\u0005U\u0002\u0001)A\u0005\u007f\u0005I1\u000f\u001e:fC6$v\u000e\t\u0005\b\u0003s\u0001A\u0011BA\u001e\u0003)Ig.\u001b;jC2L'0\u001a\u000b\u0003\u0003{\u00012aDA \u0013\r\t\t\u0005\u0005\u0002\u0005+:LG\u000fC\u0004\u0002F\u0001!\t!a\u0012\u0002\u001fM$\u0018\r^3t)>|eMZ:fiN,\"!!\u0013\u0011\u0007U\tY%C\u0002\u0002NY\u0011qbQ8na>\u001c\u0018\u000e^3PM\u001a\u001cX\r\u001e\u0005\b\u0003#\u0002A\u0011IA\f\u0003\u0019\u00198\r[3nC\"9\u0011Q\u000b\u0001\u0005B\u0005]\u0013!C4fi>3gm]3u+\t\tI\u0006E\u0002\u0010]5Dq!!\u0018\u0001\t\u0003\ny&\u0001\u0005hKR\u0014\u0015\r^2i)\u0019\t\t'!\"\u0002\nB!\u00111MA@\u001d\u0011\t)'a\u001f\u000f\t\u0005\u001d\u0014\u0011\u0010\b\u0005\u0003S\n9H\u0004\u0003\u0002l\u0005Ud\u0002BA7\u0003gj!!a\u001c\u000b\u0007\u0005ED\"\u0001\u0004=e>|GOP\u0005\u0002=%\u0011A$H\u0005\u0003\u000fmI!!\u0002\u000e\n\u0007\u0005u\u0014$A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\u0005\u00151\u0011\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T1!! \u001a\u0011!\t9)a\u0017A\u0002\u0005e\u0013!B:uCJ$\bbBAF\u00037\u0002\r!\\\u0001\u0004K:$\u0007bBAH\u0001\u0011\u0005\u0011\u0011S\u0001\u0004C\u000e\\G\u0003BA\u001f\u0003'C\u0001\"!&\u0002\u000e\u0002\u0007\u0011qS\u0001\u0002[B\u0019a,!'\n\u0007\u0005muL\u0001\u0005NkR\fG/[8o\u0011\u001d\ty\n\u0001C!\u0003w\tAa\u001d;pa\u001e9\u00111\u0015\u0002\t\u0002\u0005\u0015\u0016aD\"pk\u000eD'-Y:f'>,(oY3\u0011\u0007\u0019\u000b9K\u0002\u0004\u0002\u0005!\u0005\u0011\u0011V\n\u0004\u0003Os\u0001bB\"\u0002(\u0012\u0005\u0011Q\u0016\u000b\u0003\u0003KC!\"!-\u0002(\n\u0007I\u0011AA\f\u00039!UIR!V\u0019R{6k\u0011%F\u001b\u0006C\u0001\"!.\u0002(\u0002\u0006I\u0001M\u0001\u0010\t\u00163\u0015)\u0016'U?N\u001b\u0005*R'BA\u0001")
/* loaded from: input_file:com/couchbase/spark/sql/streaming/CouchbaseSource.class */
public class CouchbaseSource implements Source, Logging {
    private final SQLContext sqlContext;
    private final ConcurrentHashMap<Object, ConcurrentLinkedQueue<StreamMessage>> queues;
    private final AtomicReference<Offset> currentOffset;
    private final CouchbaseConfig config;
    private final Client client;
    private final StructType used_schema;
    private final String com$couchbase$spark$sql$streaming$CouchbaseSource$$idFieldName;
    private final String streamFrom;
    private final String streamTo;
    private transient Logger com$couchbase$spark$Logging$$log_;

    public static StructType DEFAULT_SCHEMA() {
        return CouchbaseSource$.MODULE$.DEFAULT_SCHEMA();
    }

    @Override // com.couchbase.spark.Logging
    public Logger com$couchbase$spark$Logging$$log_() {
        return this.com$couchbase$spark$Logging$$log_;
    }

    @Override // com.couchbase.spark.Logging
    @TraitSetter
    public void com$couchbase$spark$Logging$$log__$eq(Logger logger) {
        this.com$couchbase$spark$Logging$$log_ = logger;
    }

    @Override // com.couchbase.spark.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // com.couchbase.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    @Override // com.couchbase.spark.Logging
    public void initializeLogIfNecessary(boolean z) {
        Logging.Cclass.initializeLogIfNecessary(this, z);
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getCouchbaseClassLoader() {
        return Logging.Cclass.getCouchbaseClassLoader(this);
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getContextOrCouchbaseClassLoader() {
        return Logging.Cclass.getContextOrCouchbaseClassLoader(this);
    }

    public ConcurrentHashMap<Object, ConcurrentLinkedQueue<StreamMessage>> queues() {
        return this.queues;
    }

    public AtomicReference<Offset> currentOffset() {
        return this.currentOffset;
    }

    public CouchbaseConfig config() {
        return this.config;
    }

    public Client client() {
        return this.client;
    }

    public StructType used_schema() {
        return this.used_schema;
    }

    public String com$couchbase$spark$sql$streaming$CouchbaseSource$$idFieldName() {
        return this.com$couchbase$spark$sql$streaming$CouchbaseSource$$idFieldName;
    }

    private String streamFrom() {
        return this.streamFrom;
    }

    private String streamTo() {
        return this.streamTo;
    }

    private synchronized void initialize() {
        StreamFrom streamFrom;
        StreamTo streamTo;
        client().controlEventHandler(new ControlEventHandler(this) { // from class: com.couchbase.spark.sql.streaming.CouchbaseSource$$anon$1
            private final /* synthetic */ CouchbaseSource $outer;

            public void onEvent(ByteBuf byteBuf) {
                if (DcpSnapshotMarkerMessage.is(byteBuf)) {
                    this.$outer.client().acknowledgeBuffer(byteBuf);
                }
                byteBuf.release();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        client().dataEventHandler(new DataEventHandler(this) { // from class: com.couchbase.spark.sql.streaming.CouchbaseSource$$anon$2
            private final /* synthetic */ CouchbaseSource $outer;

            public void onEvent(ByteBuf byteBuf) {
                short partition;
                StreamMessage streamMessage;
                if (DcpMutationMessage.is(byteBuf)) {
                    byte[] bArr = new byte[DcpMutationMessage.content(byteBuf).readableBytes()];
                    DcpMutationMessage.content(byteBuf).readBytes(bArr);
                    byte[] bArr2 = new byte[DcpMutationMessage.key(byteBuf).readableBytes()];
                    DcpMutationMessage.key(byteBuf).readBytes(bArr2);
                    partition = DcpMutationMessage.partition(byteBuf);
                    streamMessage = new Mutation(bArr2, bArr, Predef$.MODULE$.int2Integer(DcpMutationMessage.expiry(byteBuf)), DcpMutationMessage.cas(byteBuf), DcpMutationMessage.partition(byteBuf), DcpMutationMessage.flags(byteBuf), DcpMutationMessage.lockTime(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                } else {
                    if (!DcpDeletionMessage.is(byteBuf)) {
                        this.$outer.client().acknowledgeBuffer(byteBuf);
                        throw new IllegalStateException(new StringBuilder().append("Got unexpected DCP Data Event ").append(MessageUtil.humanize(byteBuf)).toString());
                    }
                    byte[] bArr3 = new byte[DcpDeletionMessage.key(byteBuf).readableBytes()];
                    DcpDeletionMessage.key(byteBuf).readBytes(bArr3);
                    partition = DcpMutationMessage.partition(byteBuf);
                    Deletion deletion = new Deletion(bArr3, DcpDeletionMessage.cas(byteBuf), DcpDeletionMessage.partition(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                    this.$outer.client().acknowledgeBuffer(byteBuf);
                    streamMessage = deletion;
                }
                this.$outer.queues().get(BoxesRunTime.boxToShort(partition)).add(streamMessage);
                byteBuf.release();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        client().connect().await();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), client().numPartitions()).foreach(new CouchbaseSource$$anonfun$initialize$1(this));
        String lowerCase = streamFrom().toLowerCase();
        if ("beginning".equals(lowerCase)) {
            streamFrom = StreamFrom.BEGINNING;
        } else {
            if (!"now".equals(lowerCase)) {
                throw new IllegalArgumentException(new StringBuilder().append("Could not match StreamFrom ").append(streamFrom()).toString());
            }
            streamFrom = StreamFrom.NOW;
        }
        StreamFrom streamFrom2 = streamFrom;
        String lowerCase2 = streamTo().toLowerCase();
        if ("infinity".equals(lowerCase2)) {
            streamTo = StreamTo.INFINITY;
        } else {
            if (!"now".equals(lowerCase2)) {
                throw new IllegalArgumentException(new StringBuilder().append("Could not match StreamTo ").append(streamFrom()).toString());
            }
            streamTo = StreamTo.NOW;
        }
        StreamTo streamTo2 = streamTo;
        logInfo(new CouchbaseSource$$anonfun$initialize$2(this, streamFrom2, streamTo2));
        client().initializeState(streamFrom2, streamTo2).await();
        currentOffset().set(statesToOffsets());
        client().startStreaming(new Short[0]).await();
        Observable$.MODULE$.interval(new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds()).map(new CouchbaseSource$$anonfun$initialize$3(this)).foreach(new CouchbaseSource$$anonfun$initialize$4(this));
    }

    public CompositeOffset statesToOffsets() {
        return new CompositeOffset((IndexedSeq) ((TraversableLike) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), client().numPartitions()).map(new CouchbaseSource$$anonfun$6(this, client().sessionState()), IndexedSeq$.MODULE$.canBuildFrom())).map(new CouchbaseSource$$anonfun$7(this), IndexedSeq$.MODULE$.canBuildFrom()));
    }

    public StructType schema() {
        return used_schema();
    }

    public Option<Offset> getOffset() {
        return Option$.MODULE$.apply(currentOffset().get());
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        int i;
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        ((CompositeOffset) offset).offsets().foreach(new CouchbaseSource$$anonfun$getBatch$1(this, apply, option.map(new CouchbaseSource$$anonfun$8(this))));
        try {
            i = used_schema().fieldIndex(com$couchbase$spark$sql$streaming$CouchbaseSource$$idFieldName());
        } catch (IllegalArgumentException e) {
            i = -1;
        }
        int i2 = i;
        StructType used_schema = used_schema();
        StructType DEFAULT_SCHEMA = CouchbaseSource$.MODULE$.DEFAULT_SCHEMA();
        if (used_schema != null ? used_schema.equals(DEFAULT_SCHEMA) : DEFAULT_SCHEMA == null) {
            return this.sqlContext.createDataFrame(JavaConversions$.MODULE$.bufferAsJavaList((Buffer) apply.map(new CouchbaseSource$$anonfun$getBatch$2(this), ArrayBuffer$.MODULE$.canBuildFrom())), used_schema());
        }
        DataFrameReader schema = this.sqlContext.read().schema(used_schema());
        SparkContext sparkContext = this.sqlContext.sparkContext();
        return schema.json(sparkContext.parallelize((ArrayBuffer) apply.map(new CouchbaseSource$$anonfun$10(this, i2), ArrayBuffer$.MODULE$.canBuildFrom()), sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)));
    }

    public void ack(Mutation mutation) {
        client().acknowledgeBuffer(mutation.partition(), mutation.ackBytes());
    }

    public void stop() {
        client().disconnect().await();
    }

    public CouchbaseSource(SQLContext sQLContext, Option<StructType> option, Map<String, String> map) {
        this.sqlContext = sQLContext;
        com$couchbase$spark$Logging$$log__$eq(null);
        this.queues = new ConcurrentHashMap<>(1024);
        this.currentOffset = new AtomicReference<>();
        this.config = CouchbaseConfig$.MODULE$.apply(sQLContext.sparkContext().getConf());
        CouchbaseConnection apply = CouchbaseConnection$.MODULE$.apply();
        this.client = apply.streamClient(config(), apply.streamClient$default$2());
        this.used_schema = (StructType) option.getOrElse(new CouchbaseSource$$anonfun$2(this));
        this.com$couchbase$spark$sql$streaming$CouchbaseSource$$idFieldName = (String) map.getOrElse("idField", new CouchbaseSource$$anonfun$3(this));
        this.streamFrom = (String) map.getOrElse("streamFrom", new CouchbaseSource$$anonfun$4(this));
        this.streamTo = (String) map.getOrElse("streamTo", new CouchbaseSource$$anonfun$5(this));
        initialize();
    }
}
