package com.couchbase.spark.sql.streaming;

import com.couchbase.spark.streaming.Mutation;
import com.couchbase.spark.streaming.StreamMessage;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.spark.sql.execution.streaming.Offset;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.mutable.ArrayBuffer;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: CouchbaseSource.scala */
/* loaded from: input_file:com/couchbase/spark/sql/streaming/CouchbaseSource$$anonfun$getBatch$1.class */
public class CouchbaseSource$$anonfun$getBatch$1 extends AbstractFunction1<Option<Offset>, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ CouchbaseSource $outer;
    private final ArrayBuffer results$1;
    private final Option startOffset$1;

    public final void apply(Option<Offset> option) {
        BoxedUnit boxedUnit;
        if (option.isDefined()) {
            PartitionOffset partitionOffset = (PartitionOffset) option.get();
            ConcurrentLinkedQueue<StreamMessage> concurrentLinkedQueue = this.$outer.queues().get(BoxesRunTime.boxToShort(partitionOffset.vbid()));
            boolean z = true;
            while (z) {
                StreamMessage peek = concurrentLinkedQueue.peek();
                if (peek == null) {
                    z = false;
                } else if (peek instanceof Mutation) {
                    Mutation mutation = (Mutation) peek;
                    long unboxToLong = BoxesRunTime.unboxToLong(this.startOffset$1.map(new CouchbaseSource$$anonfun$getBatch$1$$anonfun$9(this, partitionOffset)).getOrElse(new CouchbaseSource$$anonfun$getBatch$1$$anonfun$1(this)));
                    if (mutation.bySeqno() < unboxToLong) {
                        concurrentLinkedQueue.remove();
                        this.$outer.ack(mutation);
                        boxedUnit = BoxedUnit.UNIT;
                    } else if (mutation.bySeqno() > partitionOffset.seqno() || mutation.bySeqno() < unboxToLong) {
                        z = false;
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        concurrentLinkedQueue.remove();
                        this.$outer.ack(mutation);
                        this.results$1.append(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(mutation.key(), mutation.content())}));
                        boxedUnit = BoxedUnit.UNIT;
                    }
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            }
        }
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((Option<Offset>) obj);
        return BoxedUnit.UNIT;
    }

    public CouchbaseSource$$anonfun$getBatch$1(CouchbaseSource couchbaseSource, ArrayBuffer arrayBuffer, Option option) {
        if (couchbaseSource == null) {
            throw new NullPointerException();
        }
        this.$outer = couchbaseSource;
        this.results$1 = arrayBuffer;
        this.startOffset$1 = option;
    }
}
