package com.couchbase.spark.streaming;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.spark.Logging;
import com.couchbase.spark.connection.CouchbaseConfig;
import com.couchbase.spark.connection.CouchbaseConnection$;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;

/* compiled from: CouchbaseInputDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001a3A!\u0001\u0002\u0001\u0017\t\t2i\\;dQ\n\f7/\u001a*fG\u0016Lg/\u001a:\u000b\u0005\r!\u0011!C:ue\u0016\fW.\u001b8h\u0015\t)a!A\u0003ta\u0006\u00148N\u0003\u0002\b\u0011\u0005I1m\\;dQ\n\f7/\u001a\u0006\u0002\u0013\u0005\u00191m\\7\u0004\u0001M\u0019\u0001\u0001\u0004\u000f\u0011\u000751\u0002$D\u0001\u000f\u0015\ty\u0001#\u0001\u0005sK\u000e,\u0017N^3s\u0015\t\u0019\u0011C\u0003\u0002\u0006%)\u00111\u0003F\u0001\u0007CB\f7\r[3\u000b\u0003U\t1a\u001c:h\u0013\t9bB\u0001\u0005SK\u000e,\u0017N^3s!\tI\"$D\u0001\u0003\u0013\tY\"AA\u0007TiJ,\u0017-\\'fgN\fw-\u001a\t\u0003;yi\u0011\u0001B\u0005\u0003?\u0011\u0011q\u0001T8hO&tw\r\u0003\u0005\"\u0001\t\u0005\t\u0015!\u0003#\u0003\u0019\u0019wN\u001c4jOB\u00111EJ\u0007\u0002I)\u0011Q\u0005B\u0001\u000bG>tg.Z2uS>t\u0017BA\u0014%\u0005=\u0019u.^2iE\u0006\u001cXmQ8oM&<\u0007\u0002C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002\u0015\t,8m[3u\u001d\u0006lW\r\u0005\u0002,c9\u0011AfL\u0007\u0002[)\ta&A\u0003tG\u0006d\u0017-\u0003\u00021[\u00051\u0001K]3eK\u001aL!AM\u001a\u0003\rM#(/\u001b8h\u0015\t\u0001T\u0006C\u00056\u0001\t\u0005\t\u0015!\u00037y\u0005a1\u000f^8sC\u001e,G*\u001a<fYB\u0011qGO\u0007\u0002q)\u0011\u0011(E\u0001\bgR|'/Y4f\u0013\tY\u0004H\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G.\u0003\u00026-!Aa\b\u0001B\u0001B\u0003%q(\u0001\u0003ge>l\u0007CA\rA\u0013\t\t%A\u0001\u0006TiJ,\u0017-\u001c$s_6D\u0001b\u0011\u0001\u0003\u0002\u0003\u0006I\u0001R\u0001\u0003i>\u0004\"!G#\n\u0005\u0019\u0013!\u0001C*ue\u0016\fW\u000eV8\t\u000b!\u0003A\u0011A%\u0002\rqJg.\u001b;?)\u0019Q5\nT'O\u001fB\u0011\u0011\u0004\u0001\u0005\u0006C\u001d\u0003\rA\t\u0005\u0006S\u001d\u0003\rA\u000b\u0005\u0006k\u001d\u0003\rA\u000e\u0005\u0006}\u001d\u0003\ra\u0010\u0005\u0006\u0007\u001e\u0003\r\u0001\u0012\u0005\u0006#\u0002!\tEU\u0001\b_:\u001cF/\u0019:u)\u0005\u0019\u0006C\u0001\u0017U\u0013\t)VF\u0001\u0003V]&$\b\"B,\u0001\t\u0003\u0012\u0016AB8o'R|\u0007\u000f")
/* loaded from: input_file:com/couchbase/spark/streaming/CouchbaseReceiver.class */
public class CouchbaseReceiver extends Receiver<StreamMessage> implements Logging {
    private final CouchbaseConfig config;
    private final String bucketName;
    private final StreamFrom from;
    private final StreamTo to;
    private transient Logger com$couchbase$spark$Logging$$log_;

    @Override // com.couchbase.spark.Logging
    public Logger com$couchbase$spark$Logging$$log_() {
        return this.com$couchbase$spark$Logging$$log_;
    }

    @Override // com.couchbase.spark.Logging
    @TraitSetter
    public void com$couchbase$spark$Logging$$log__$eq(Logger logger) {
        this.com$couchbase$spark$Logging$$log_ = logger;
    }

    @Override // com.couchbase.spark.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // com.couchbase.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // com.couchbase.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // com.couchbase.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    @Override // com.couchbase.spark.Logging
    public void initializeLogIfNecessary(boolean z) {
        Logging.Cclass.initializeLogIfNecessary(this, z);
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getCouchbaseClassLoader() {
        return Logging.Cclass.getCouchbaseClassLoader(this);
    }

    @Override // com.couchbase.spark.Logging
    public ClassLoader getContextOrCouchbaseClassLoader() {
        return Logging.Cclass.getContextOrCouchbaseClassLoader(this);
    }

    public void onStart() {
        Client streamClient = CouchbaseConnection$.MODULE$.apply().streamClient(this.config, this.bucketName);
        streamClient.controlEventHandler(new CouchbaseReceiver$$anon$1(this, streamClient));
        streamClient.dataEventHandler(new DataEventHandler(this) { // from class: com.couchbase.spark.streaming.CouchbaseReceiver$$anon$3
            private final /* synthetic */ CouchbaseReceiver $outer;

            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                Object deletion;
                if (DcpMutationMessage.is(byteBuf)) {
                    byte[] bArr = new byte[DcpMutationMessage.content(byteBuf).readableBytes()];
                    DcpMutationMessage.content(byteBuf).readBytes(bArr);
                    byte[] bArr2 = new byte[DcpMutationMessage.key(byteBuf).readableBytes()];
                    DcpMutationMessage.key(byteBuf).readBytes(bArr2);
                    deletion = new Mutation(bArr2, bArr, Predef$.MODULE$.int2Integer(DcpMutationMessage.expiry(byteBuf)), DcpMutationMessage.cas(byteBuf), DcpMutationMessage.partition(byteBuf), DcpMutationMessage.flags(byteBuf), DcpMutationMessage.lockTime(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                } else {
                    if (!DcpDeletionMessage.is(byteBuf)) {
                        byteBuf.release();
                        throw new IllegalStateException(new StringBuilder().append("Got unexpected DCP Data Event ").append(MessageUtil.humanize(byteBuf)).toString());
                    }
                    byte[] bArr3 = new byte[DcpDeletionMessage.key(byteBuf).readableBytes()];
                    DcpDeletionMessage.key(byteBuf).readBytes(bArr3);
                    deletion = new Deletion(bArr3, DcpDeletionMessage.cas(byteBuf), DcpDeletionMessage.partition(byteBuf), DcpDeletionMessage.bySeqno(byteBuf), DcpDeletionMessage.revisionSeqno(byteBuf), byteBuf.readableBytes());
                }
                this.$outer.store(deletion);
                channelFlowController.ack(byteBuf);
                byteBuf.release();
            }

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }
        });
        streamClient.connect().await();
        StreamFrom streamFrom = this.from;
        FromNow$ fromNow$ = FromNow$.MODULE$;
        if (streamFrom != null ? streamFrom.equals(fromNow$) : fromNow$ == null) {
            StreamTo streamTo = this.to;
            ToInfinity$ toInfinity$ = ToInfinity$.MODULE$;
            if (streamTo != null ? streamTo.equals(toInfinity$) : toInfinity$ == null) {
                streamClient.initializeState(com.couchbase.client.dcp.StreamFrom.NOW, com.couchbase.client.dcp.StreamTo.INFINITY).await();
                streamClient.startStreaming(new Short[0]).await();
                return;
            }
        }
        StreamFrom streamFrom2 = this.from;
        FromBeginning$ fromBeginning$ = FromBeginning$.MODULE$;
        if (streamFrom2 != null ? streamFrom2.equals(fromBeginning$) : fromBeginning$ == null) {
            StreamTo streamTo2 = this.to;
            ToInfinity$ toInfinity$2 = ToInfinity$.MODULE$;
            if (streamTo2 != null ? streamTo2.equals(toInfinity$2) : toInfinity$2 == null) {
                streamClient.initializeState(com.couchbase.client.dcp.StreamFrom.BEGINNING, com.couchbase.client.dcp.StreamTo.INFINITY).await();
                streamClient.startStreaming(new Short[0]).await();
                return;
            }
        }
        StreamFrom streamFrom3 = this.from;
        FromBeginning$ fromBeginning$2 = FromBeginning$.MODULE$;
        if (streamFrom3 != null ? streamFrom3.equals(fromBeginning$2) : fromBeginning$2 == null) {
            StreamTo streamTo3 = this.to;
            ToNow$ toNow$ = ToNow$.MODULE$;
            if (streamTo3 != null ? streamTo3.equals(toNow$) : toNow$ == null) {
                streamClient.initializeState(com.couchbase.client.dcp.StreamFrom.BEGINNING, com.couchbase.client.dcp.StreamTo.NOW).await();
                streamClient.startStreaming(new Short[0]).await();
                return;
            }
        }
        throw new IllegalArgumentException("Unsupported From/To Combination!");
    }

    public void onStop() {
        Client streamClient = CouchbaseConnection$.MODULE$.apply().streamClient(this.config, this.bucketName);
        streamClient.stopStreaming(new Short[0]).await();
        streamClient.disconnect().await();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public CouchbaseReceiver(CouchbaseConfig couchbaseConfig, String str, StorageLevel storageLevel, StreamFrom streamFrom, StreamTo streamTo) {
        super(storageLevel);
        this.config = couchbaseConfig;
        this.bucketName = str;
        this.from = streamFrom;
        this.to = streamTo;
        com$couchbase$spark$Logging$$log__$eq(null);
    }
}
