package com.couchbase.spark.kv;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.highlevel.CollectionCreated;
import com.couchbase.client.dcp.highlevel.CollectionDropped;
import com.couchbase.client.dcp.highlevel.CollectionFlushed;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.DocumentChange;
import com.couchbase.client.dcp.highlevel.FailoverLog;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.Rollback;
import com.couchbase.client.dcp.highlevel.ScopeCreated;
import com.couchbase.client.dcp.highlevel.ScopeDropped;
import com.couchbase.client.dcp.highlevel.SeqnoAdvanced;
import com.couchbase.client.dcp.highlevel.SnapshotDetails;
import com.couchbase.client.dcp.highlevel.StreamEnd;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.spark.config.CouchbaseConfig;
import com.couchbase.spark.config.CouchbaseConnection$;
import com.couchbase.spark.util.Version$;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.InternalRow$;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData$;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.unsafe.types.UTF8String;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.duration.Duration$;
import scala.math.Ordering$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KeyValuePartitionReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Mf\u0001\u0002\u0011\"\u0001)B\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0013\u0005\t\u001f\u0002\u0011\t\u0011)A\u0005!\")a\u000b\u0001C\u0001/\"91\f\u0001b\u0001\n\u0013a\u0006B\u00021\u0001A\u0003%Q\fC\u0004b\u0001\t\u0007I\u0011\u00022\t\r%\u0004\u0001\u0015!\u0003d\u0011\u001dQ\u0007A1A\u0005\n-DaA\u001f\u0001!\u0002\u0013a\u0007bB>\u0001\u0005\u0004%I\u0001 \u0005\b\u0003#\u0001\u0001\u0015!\u0003~\u0011%\t\u0019\u0002\u0001b\u0001\n\u0013\t)\u0002\u0003\u0005\u0002*\u0001\u0001\u000b\u0011BA\f\u0011%\tY\u0003\u0001b\u0001\n\u0013\ti\u0003\u0003\u0005\u0002J\u0001\u0001\u000b\u0011BA\u0018\u0011-\tY\u0005\u0001a\u0001\u0002\u0004%I!!\u0014\t\u0017\u0005=\u0003\u00011AA\u0002\u0013%\u0011\u0011\u000b\u0005\f\u0003;\u0002\u0001\u0019!A!B\u0013\ti\u0002C\u0005\u0002`\u0001\u0011\r\u0011\"\u0003\u0002b!A\u0011q\u000e\u0001!\u0002\u0013\t\u0019\u0007C\u0005\u0002r\u0001\u0011\r\u0011\"\u0003\u0002t!A\u0011Q\u0010\u0001!\u0002\u0013\t)\bC\u0004\u0002��\u0001!I!!!\t\u000f\u0005\r\u0005\u0001\"\u0003\u0002\u0002\"9\u0011Q\u0011\u0001\u0005B\u0005\u001d\u0005bBAH\u0001\u0011\u0005\u0013\u0011\u0013\u0005\b\u0003\u001f\u0003A\u0011BAJ\u0011\u001d\t9\u000b\u0001C\u0005\u0003SCq!a+\u0001\t\u0013\t\t\tC\u0004\u0002.\u0002!\t%a,\t\u000f\u0005E\u0006\u0001\"\u0011\u0002\u0002\n92*Z=WC2,X\rU1si&$\u0018n\u001c8SK\u0006$WM\u001d\u0006\u0003E\r\n!a\u001b<\u000b\u0005\u0011*\u0013!B:qCJ\\'B\u0001\u0014(\u0003%\u0019w.^2iE\u0006\u001cXMC\u0001)\u0003\r\u0019w.\\\u0002\u0001'\r\u00011f\r\t\u0003YEj\u0011!\f\u0006\u0003]=\nA\u0001\\1oO*\t\u0001'\u0001\u0003kCZ\f\u0017B\u0001\u001a.\u0005\u0019y%M[3diB\u0019AG\u0011#\u000e\u0003UR!AN\u001c\u0002\u0013M$(/Z1nS:<'B\u0001\u001d:\u0003\u0011\u0011X-\u00193\u000b\u0005iZ\u0014!C2p]:,7\r^8s\u0015\taT(A\u0002tc2T!\u0001\n \u000b\u0005}\u0002\u0015AB1qC\u000eDWMC\u0001B\u0003\ry'oZ\u0005\u0003\u0007V\u0012\u0011dQ8oi&tWo\\;t!\u0006\u0014H/\u001b;j_:\u0014V-\u00193feB\u0011Q\tS\u0007\u0002\r*\u0011qiO\u0001\tG\u0006$\u0018\r\\=ti&\u0011\u0011J\u0012\u0002\f\u0013:$XM\u001d8bYJ{w/A\u0005qCJ$\u0018\u000e^5p]B\u0011A*T\u0007\u0002C%\u0011a*\t\u0002\u0017\u0017\u0016Lh+\u00197vK&s\u0007/\u001e;QCJ$\u0018\u000e^5p]\u0006Q1m\u001c8uS:,x.^:\u0011\u0005E#V\"\u0001*\u000b\u0003M\u000bQa]2bY\u0006L!!\u0016*\u0003\u000f\t{w\u000e\\3b]\u00061A(\u001b8jiz\"2\u0001W-[!\ta\u0005\u0001C\u0003K\u0007\u0001\u00071\nC\u0003P\u0007\u0001\u0007\u0001+\u0001\u0007tiJ,\u0017-\\\"p]\u001aLw-F\u0001^!\tae,\u0003\u0002`C\t!2*Z=WC2,Xm\u0015;sK\u0006l7i\u001c8gS\u001e\fQb\u001d;sK\u0006l7i\u001c8gS\u001e\u0004\u0013aD2pk\u000eD'-Y:f\u0007>tg-[4\u0016\u0003\r\u0004\"\u0001Z4\u000e\u0003\u0015T!AZ\u0012\u0002\r\r|gNZ5h\u0013\tAWMA\bD_V\u001c\u0007NY1tK\u000e{gNZ5h\u0003A\u0019w.^2iE\u0006\u001cXmQ8oM&<\u0007%\u0001\u000bdkJ\u0014XM\u001c;TiJ,\u0017-\\(gMN,Go]\u000b\u0002YB!QN\u001d;x\u001b\u0005q'BA8q\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003c>\nA!\u001e;jY&\u00111O\u001c\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bCA)v\u0013\t1(KA\u0002J]R\u0004\"\u0001\u0014=\n\u0005e\f#\u0001F&fsZ\u000bG.^3TiJ,\u0017-\\(gMN,G/A\u000bdkJ\u0014XM\u001c;TiJ,\u0017-\\(gMN,Go\u001d\u0011\u0002!M$(/Z1ng\u000e{W\u000e\u001d7fi\u0016$W#A?\u0011\t5\u0014HO \t\u0004\u007f\u00065QBAA\u0001\u0015\u0011\t\u0019!!\u0002\u0002\u000f5,7o]1hK*!\u0011qAA\u0005\u0003\r!7\r\u001d\u0006\u0004\u0003\u0017)\u0013AB2mS\u0016tG/\u0003\u0003\u0002\u0010\u0005\u0005!aD*ue\u0016\fW.\u00128e%\u0016\f7o\u001c8\u0002#M$(/Z1ng\u000e{W\u000e\u001d7fi\u0016$\u0007%A\u0006dQ\u0006tw-Z)vKV,WCAA\f!\u0015i\u0017\u0011DA\u000f\u0013\r\tYB\u001c\u0002\u0014\u0019&t7.\u001a3CY>\u001c7.\u001b8h#V,W/\u001a\t\u0005\u0003?\t)#\u0004\u0002\u0002\")!\u00111EA\u0003\u0003%A\u0017n\u001a5mKZ,G.\u0003\u0003\u0002(\u0005\u0005\"A\u0004#pGVlWM\u001c;DQ\u0006tw-Z\u0001\rG\"\fgnZ3Rk\u0016,X\rI\u0001\u000bKJ\u0014xN])vKV,WCAA\u0018!\u0015i\u0017\u0011DA\u0019!\u0011\t\u0019$a\u0011\u000f\t\u0005U\u0012q\b\b\u0005\u0003o\ti$\u0004\u0002\u0002:)\u0019\u00111H\u0015\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0019\u0016bAA!%\u00069\u0001/Y2lC\u001e,\u0017\u0002BA#\u0003\u000f\u0012\u0011\u0002\u00165s_^\f'\r\\3\u000b\u0007\u0005\u0005#+A\u0006feJ|'/U;fk\u0016\u0004\u0013\u0001D2veJ,g\u000e^#oiJLXCAA\u000f\u0003A\u0019WO\u001d:f]R,e\u000e\u001e:z?\u0012*\u0017\u000f\u0006\u0003\u0002T\u0005e\u0003cA)\u0002V%\u0019\u0011q\u000b*\u0003\tUs\u0017\u000e\u001e\u0005\n\u00037\n\u0012\u0011!a\u0001\u0003;\t1\u0001\u001f\u00132\u00035\u0019WO\u001d:f]R,e\u000e\u001e:zA\u0005iqn\u001e8fIZ\u0013WoY6fiN,\"!a\u0019\u0011\u000b\u0005\u0015\u00141\u000e;\u000e\u0005\u0005\u001d$bAA5%\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u00055\u0014q\r\u0002\t\u0013R,'/\u00192mK\u0006qqn\u001e8fIZ\u0013WoY6fiN\u0004\u0013!\u00033da\u000ec\u0017.\u001a8u+\t\t)\b\u0005\u0003\u0002x\u0005eTBAA\u0003\u0013\u0011\tY(!\u0002\u0003\r\rc\u0017.\u001a8u\u0003)!7\r]\"mS\u0016tG\u000fI\u0001\u001cCR$\u0018m\u00195B]\u0012\u001cuN\u001c8fGR$5\r\u001d'jgR,g.\u001a:\u0015\u0005\u0005M\u0013!F5oSR\fe\u000eZ*uCJ$Hi\u00199TiJ,\u0017-\\\u0001\nO\u0016$xJ\u001a4tKR$\"!!#\u0011\u0007Q\nY)C\u0002\u0002\u000eV\u0012q\u0002U1si&$\u0018n\u001c8PM\u001a\u001cX\r^\u0001\u0005]\u0016DH\u000fF\u0001Q)\r\u0001\u0016Q\u0013\u0005\u0007\u0003/[\u0002\u0019\u0001;\u0002\u00159,XNU3ue&,7\u000fK\u0002\u001c\u00037\u0003B!!(\u0002$6\u0011\u0011q\u0014\u0006\u0004\u0003C\u0013\u0016AC1o]>$\u0018\r^5p]&!\u0011QUAP\u0005\u001d!\u0018-\u001b7sK\u000e\f1CY1uG\"\u001cFO]3b[\u000e{W\u000e\u001d7fi\u0016,\u0012\u0001U\u0001\u0010G\",7m[#se>\u0014\u0018+^3vK\u0006\u0019q-\u001a;\u0015\u0003\u0011\u000bQa\u00197pg\u0016\u0004")
/* loaded from: input_file:com/couchbase/spark/kv/KeyValuePartitionReader.class */
public class KeyValuePartitionReader implements ContinuousPartitionReader<InternalRow> {
    private final KeyValueInputPartition partition;
    private final boolean continuous;
    private final KeyValueStreamConfig streamConfig;
    private final CouchbaseConfig couchbaseConfig;
    private final ConcurrentHashMap<Object, KeyValueStreamOffset> currentStreamOffsets;
    private final ConcurrentHashMap<Object, StreamEndReason> com$couchbase$spark$kv$KeyValuePartitionReader$$streamsCompleted = new ConcurrentHashMap<>();
    private final LinkedBlockingQueue<DocumentChange> com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue = new LinkedBlockingQueue<>(100);
    private final LinkedBlockingQueue<Throwable> com$couchbase$spark$kv$KeyValuePartitionReader$$errorQueue = new LinkedBlockingQueue<>(1);
    private DocumentChange currentEntry;
    private final Iterable<Object> ownedVbuckets;
    private final Client dcpClient;

    private KeyValueStreamConfig streamConfig() {
        return this.streamConfig;
    }

    private CouchbaseConfig couchbaseConfig() {
        return this.couchbaseConfig;
    }

    private ConcurrentHashMap<Object, KeyValueStreamOffset> currentStreamOffsets() {
        return this.currentStreamOffsets;
    }

    public ConcurrentHashMap<Object, StreamEndReason> com$couchbase$spark$kv$KeyValuePartitionReader$$streamsCompleted() {
        return this.com$couchbase$spark$kv$KeyValuePartitionReader$$streamsCompleted;
    }

    public LinkedBlockingQueue<DocumentChange> com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue() {
        return this.com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue;
    }

    public LinkedBlockingQueue<Throwable> com$couchbase$spark$kv$KeyValuePartitionReader$$errorQueue() {
        return this.com$couchbase$spark$kv$KeyValuePartitionReader$$errorQueue;
    }

    private DocumentChange currentEntry() {
        return this.currentEntry;
    }

    private void currentEntry_$eq(DocumentChange documentChange) {
        this.currentEntry = documentChange;
    }

    private Iterable<Object> ownedVbuckets() {
        return this.ownedVbuckets;
    }

    private Client dcpClient() {
        return this.dcpClient;
    }

    private void attachAndConnectDcpListener() {
        dcpClient().listener(new DatabaseChangeListener(this) { // from class: com.couchbase.spark.kv.KeyValuePartitionReader$$anon$1
            private final /* synthetic */ KeyValuePartitionReader $outer;

            public void onSeqnoAdvanced(SeqnoAdvanced seqnoAdvanced) {
                super.onSeqnoAdvanced(seqnoAdvanced);
            }

            public void onScopeCreated(ScopeCreated scopeCreated) {
                super.onScopeCreated(scopeCreated);
            }

            public void onScopeDropped(ScopeDropped scopeDropped) {
                super.onScopeDropped(scopeDropped);
            }

            public void onCollectionCreated(CollectionCreated collectionCreated) {
                super.onCollectionCreated(collectionCreated);
            }

            public void onCollectionDropped(CollectionDropped collectionDropped) {
                super.onCollectionDropped(collectionDropped);
            }

            public void onCollectionFlushed(CollectionFlushed collectionFlushed) {
                super.onCollectionFlushed(collectionFlushed);
            }

            public void onRollback(Rollback rollback) {
                super.onRollback(rollback);
            }

            public void onSnapshot(SnapshotDetails snapshotDetails) {
                super.onSnapshot(snapshotDetails);
            }

            public void onFailoverLog(FailoverLog failoverLog) {
                super.onFailoverLog(failoverLog);
            }

            public void onMutation(Mutation mutation) {
                this.$outer.com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue().put(mutation);
            }

            public void onDeletion(Deletion deletion) {
                this.$outer.com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue().put(deletion);
            }

            public void onFailure(StreamFailure streamFailure) {
                this.$outer.com$couchbase$spark$kv$KeyValuePartitionReader$$errorQueue().offer(streamFailure.getCause());
            }

            public void onStreamEnd(StreamEnd streamEnd) {
                this.$outer.com$couchbase$spark$kv$KeyValuePartitionReader$$streamsCompleted().put(BoxesRunTime.boxToInteger(streamEnd.getVbucket()), streamEnd.getReason());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, FlowControlMode.AUTOMATIC);
        dcpClient().connect().block();
    }

    private void initAndStartDcpStream() {
        KeyValuePartitionOffset partitionOffset = this.partition.partitionOffset();
        Map<Object, KeyValueStreamOffset> streamStartOffsets = partitionOffset.streamStartOffsets();
        List list = (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) ((TraversableOnce) ownedVbuckets().map(obj -> {
            return Integer.valueOf(BoxesRunTime.unboxToInt(obj));
        }, Iterable$.MODULE$.canBuildFrom())).toSeq().sorted(Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms()))).asJava();
        dcpClient().initializeState(StreamFromVariants$.MODULE$.StreamFromVariantsValue(streamConfig().streamFrom()).asDcpStreamFrom(), StreamTo.INFINITY).block();
        if (StreamFromVariants$.MODULE$.StreamFromVariantsValue(streamConfig().streamFrom()).fromBeginning()) {
            dcpClient().failoverLogs(list).doOnNext(byteBuf -> {
                int vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
                PartitionState partitionState = this.dcpClient().sessionState().get(vbucket);
                partitionState.setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
                this.dcpClient().sessionState().set(vbucket, partitionState);
            }).blockLast();
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        streamStartOffsets.foreach(tuple2 -> {
            $anonfun$initAndStartDcpStream$3(this, partitionOffset, tuple2);
            return BoxedUnit.UNIT;
        });
        dcpClient().startStreaming(list).block();
    }

    public PartitionOffset getOffset() {
        return new KeyValuePartitionOffset(((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(currentStreamOffsets()).asScala()).toMap(Predef$.MODULE$.$conforms()), None$.MODULE$);
    }

    public boolean next() {
        return next(0);
    }

    private boolean next(int i) {
        while (true) {
            checkErrorQueue();
            if (this.continuous) {
                currentEntry_$eq(com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue().take());
                currentStreamOffsets().put(BoxesRunTime.boxToInteger(currentEntry().getVbucket()), KeyValueStreamOffset$.MODULE$.apply(currentEntry().getOffset()));
                return true;
            }
            currentEntry_$eq(com$couchbase$spark$kv$KeyValuePartitionReader$$changeQueue().poll());
            if (currentEntry() != null) {
                currentStreamOffsets().put(BoxesRunTime.boxToInteger(currentEntry().getVbucket()), KeyValueStreamOffset$.MODULE$.apply(currentEntry().getOffset()));
                return true;
            }
            if (batchStreamComplete()) {
                return false;
            }
            if (i > 0) {
                Thread.sleep(RichInt$.MODULE$.min$extension(Predef$.MODULE$.intWrapper(i), 10));
            }
            i++;
        }
    }

    private boolean batchStreamComplete() {
        return com$couchbase$spark$kv$KeyValuePartitionReader$$streamsCompleted().size() >= ownedVbuckets().size();
    }

    private void checkErrorQueue() {
        Throwable poll = com$couchbase$spark$kv$KeyValuePartitionReader$$errorQueue().poll();
        if (poll != null) {
            throw new RuntimeException(poll);
        }
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public InternalRow m24get() {
        return InternalRow$.MODULE$.fromSeq(Predef$.MODULE$.genericArrayOps(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.partition.schema().fieldNames())).map(str -> {
            UTF8String apply;
            if ("id".equals(str)) {
                apply = UTF8String.fromString(this.currentEntry().getKey());
            } else if ("content".equals(str)) {
                apply = this.currentEntry().getContent();
            } else if ("deletion".equals(str)) {
                apply = BoxesRunTime.boxToBoolean(!this.currentEntry().isMutation());
            } else if ("cas".equals(str)) {
                apply = BoxesRunTime.boxToLong(this.currentEntry().getCas());
            } else if ("scope".equals(str)) {
                apply = UTF8String.fromString(this.currentEntry().getCollection().scope().name());
            } else if ("collection".equals(str)) {
                apply = UTF8String.fromString(this.currentEntry().getCollection().name());
            } else if ("timestamp".equals(str)) {
                apply = BoxesRunTime.boxToLong(ChronoUnit.MICROS.between(Instant.EPOCH, this.currentEntry().getTimestamp()));
            } else if ("vbucket".equals(str)) {
                apply = BoxesRunTime.boxToInteger(this.currentEntry().getVbucket());
            } else {
                if (!"xattrs".equals(str)) {
                    throw new UnsupportedOperationException(new StringBuilder(33).append("Unsupported field name in schema ").append(str).toString());
                }
                apply = ArrayBasedMapData$.MODULE$.apply(this.currentEntry().getXattrs(), obj -> {
                    return obj.toString();
                }, obj2 -> {
                    return obj2.toString();
                });
            }
            return apply;
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Any()))).toSeq());
    }

    public void close() {
        dcpClient().close();
    }

    public static final /* synthetic */ void $anonfun$initAndStartDcpStream$4(PartitionState partitionState, int i, Map map) {
        partitionState.setEndSeqno(((KeyValueStreamOffset) map.apply(BoxesRunTime.boxToInteger(i))).seqno());
    }

    public static final /* synthetic */ void $anonfun$initAndStartDcpStream$3(KeyValuePartitionReader keyValuePartitionReader, KeyValuePartitionOffset keyValuePartitionOffset, Tuple2 tuple2) {
        int _1$mcI$sp = tuple2._1$mcI$sp();
        PartitionState fromOffset = PartitionState.fromOffset(((KeyValueStreamOffset) tuple2._2()).toStreamOffset());
        fromOffset.setFailoverLog(keyValuePartitionReader.dcpClient().sessionState().get(_1$mcI$sp).getFailoverLog());
        keyValuePartitionOffset.streamEndOffsets().foreach(map -> {
            $anonfun$initAndStartDcpStream$4(fromOffset, _1$mcI$sp, map);
            return BoxedUnit.UNIT;
        });
        keyValuePartitionReader.dcpClient().sessionState().set(_1$mcI$sp, fromOffset);
    }

    public KeyValuePartitionReader(KeyValueInputPartition keyValueInputPartition, boolean z) {
        this.partition = keyValueInputPartition;
        this.continuous = z;
        this.streamConfig = keyValueInputPartition.config();
        this.couchbaseConfig = keyValueInputPartition.conf();
        this.currentStreamOffsets = new ConcurrentHashMap<>((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(keyValueInputPartition.partitionOffset().streamStartOffsets()).asJava());
        this.ownedVbuckets = keyValueInputPartition.partitionOffset().streamStartOffsets().keys();
        this.dcpClient = Client.builder().seedNodes(new String[]{CouchbaseConnection$.MODULE$.apply().dcpSeedNodes(couchbaseConfig())}).securityConfig(CouchbaseConnection$.MODULE$.apply().dcpSecurityConfig(couchbaseConfig())).flowControl(BoxesRunTime.unboxToInt(streamConfig().flowControlBufferSize().getOrElse(() -> {
            return 10485760;
        }))).mitigateRollbacks(Duration$.MODULE$.apply((String) streamConfig().persistencePollingInterval().getOrElse(() -> {
            return "100ms";
        })).toMicros(), TimeUnit.MICROSECONDS).userAgent(Version$.MODULE$.productName(), Version$.MODULE$.version(), new String[]{"reader"}).credentials(couchbaseConfig().credentials().username(), couchbaseConfig().credentials().password()).bucket(streamConfig().bucket()).collectionsAware(streamConfig().scope().isDefined() || streamConfig().collections().nonEmpty()).scopeName((streamConfig().scope().isDefined() && streamConfig().collections().isEmpty()) ? (String) streamConfig().scope().get() : null).collectionNames((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) streamConfig().collections().map(str -> {
            String str;
            Some scope = this.streamConfig().scope();
            if (scope instanceof Some) {
                str = (String) scope.value();
            } else {
                if (!None$.MODULE$.equals(scope)) {
                    throw new MatchError(scope);
                }
                str = "_default";
            }
            return new StringBuilder(1).append(str).append(".").append(str).toString();
        }, Seq$.MODULE$.canBuildFrom())).asJava()).noValue(!streamConfig().streamContent()).xattrs(streamConfig().streamXattrs()).build();
        attachAndConnectDcpListener();
        initAndStartDcpStream();
    }
}
