package com.couchbase.spark.kv;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.state.FailoverLogEntry;
import com.couchbase.spark.config.CouchbaseConfig;
import com.couchbase.spark.config.CouchbaseConfig$;
import com.couchbase.spark.config.CouchbaseConnection$;
import com.couchbase.spark.util.Version$;
import java.util.Collection;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.json4s.DefaultFormats;
import org.json4s.DefaultFormats$;
import org.json4s.jackson.Serialization$;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.Growable;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.Manifest;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KeyValueDataStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mb\u0001B\t\u0013\u0001mA\u0001b\u000f\u0001\u0003\u0002\u0003\u0006I\u0001\u0010\u0005\t\u0001\u0002\u0011\t\u0011)A\u0005\u0003\")a\n\u0001C\u0001\u001f\"91\u000b\u0001b\u0001\n\u0007!\u0006BB.\u0001A\u0003%Q\u000b\u0003\u0005]\u0001!\u0015\r\u0011\"\u0003^\u0011!\u0011\u0007\u0001#b\u0001\n\u0003\u0019\u0007bB5\u0001\u0005\u0004%\tA\u001b\u0005\u0007g\u0002\u0001\u000b\u0011B6\t\u000bQ\u0004A\u0011I;\t\u000be\u0004A\u0011\u0002>\t\u000f\u0005U\u0001\u0001\"\u0001\u0002\u0018!9\u0011Q\u0004\u0001\u0005\u0002\u0005}\u0001bBA\u0011\u0001\u0011\u0005\u00131\u0005\u0005\b\u0003S\u0001A\u0011IA\u0016\u0011\u001d\t9\u0004\u0001C!\u0003s\u0011!cS3z-\u0006dW/\u001a#bi\u0006\u001cFO]3b[*\u00111\u0003F\u0001\u0003WZT!!\u0006\f\u0002\u000bM\u0004\u0018M]6\u000b\u0005]A\u0012!C2pk\u000eD'-Y:f\u0015\u0005I\u0012aA2p[\u000e\u00011\u0003\u0002\u0001\u001dIU\u0002\"!\b\u0012\u000e\u0003yQ!a\b\u0011\u0002\t1\fgn\u001a\u0006\u0002C\u0005!!.\u0019<b\u0013\t\u0019cD\u0001\u0004PE*,7\r\u001e\t\u0003KMj\u0011A\n\u0006\u0003O!\n\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005%R\u0013\u0001\u0002:fC\u0012T!a\u000b\u0017\u0002\u0013\r|gN\\3di>\u0014(BA\u0017/\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003+=R!\u0001M\u0019\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0014aA8sO&\u0011AG\n\u0002\u0010'B\f'o\u001b#bi\u0006\u001cFO]3b[B\u0011a'O\u0007\u0002o)\u0011\u0001HL\u0001\tS:$XM\u001d8bY&\u0011!h\u000e\u0002\b\u0019><w-\u001b8h\u0003\u0019\u0019wN\u001c4jOB\u0011QHP\u0007\u0002%%\u0011qH\u0005\u0002\u0015\u0017\u0016Lh+\u00197vKN#(/Z1n\u0007>tg-[4\u0002%\rDWmY6q_&tG\u000fT8dCRLwN\u001c\t\u0003\u0005.s!aQ%\u0011\u0005\u0011;U\"A#\u000b\u0005\u0019S\u0012A\u0002\u001fs_>$hHC\u0001I\u0003\u0015\u00198-\u00197b\u0013\tQu)\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u00196\u0013aa\u0015;sS:<'B\u0001&H\u0003\u0019a\u0014N\\5u}Q\u0019\u0001+\u0015*\u0011\u0005u\u0002\u0001\"B\u001e\u0004\u0001\u0004a\u0004\"\u0002!\u0004\u0001\u0004\t\u0015A\u00043fM\u0006,H\u000e\u001e$pe6\fGo]\u000b\u0002+B\u0011a+W\u0007\u0002/*\u0011\u0001,M\u0001\u0007UN|g\u000eN:\n\u0005i;&A\u0004#fM\u0006,H\u000e\u001e$pe6\fGo]\u0001\u0010I\u00164\u0017-\u001e7u\r>\u0014X.\u0019;tA\u0005a1\u000f]1sWN+7o]5p]V\ta\f\u0005\u0002`A6\tA&\u0003\u0002bY\ta1\u000b]1sWN+7o]5p]\u0006!1m\u001c8g+\u0005!\u0007CA3h\u001b\u00051'BA\u001e\u0015\u0013\tAgMA\bD_V\u001c\u0007NY1tK\u000e{gNZ5h\u0003%!7\r]\"mS\u0016tG/F\u0001l!\ta\u0017/D\u0001n\u0015\tqw.A\u0002eGBT!\u0001\u001d\f\u0002\r\rd\u0017.\u001a8u\u0013\t\u0011XN\u0001\u0004DY&,g\u000e^\u0001\u000bI\u000e\u00048\t\\5f]R\u0004\u0013!D5oSRL\u0017\r\\(gMN,G\u000fF\u0001w!\t)s/\u0003\u0002yM\t1qJ\u001a4tKR\fAD]3m_\u0006$7\u000b^1si>3gm]3u\u0007\",7m\u001b9pS:$8\u000fF\u0001|!\u001da\u00181AA\u0004\u0003\u001fi\u0011! \u0006\u0003}~\fq!\\;uC\ndWMC\u0002\u0002\u0002\u001d\u000b!bY8mY\u0016\u001cG/[8o\u0013\r\t)! \u0002\u0004\u001b\u0006\u0004\b\u0003BA\u0005\u0003\u0017i\u0011aR\u0005\u0004\u0003\u001b9%aA%oiB\u0019Q(!\u0005\n\u0007\u0005M!C\u0001\u000bLKf4\u0016\r\\;f'R\u0014X-Y7PM\u001a\u001cX\r^\u0001\u000fGV\u0014(/\u001a8u\u001f\u001a47/\u001a;t)\t\tI\u0002E\u0004C\u00037\t9!a\u0004\n\u0007\u0005\u0015Q*A\bok6\\e\u000fU1si&$\u0018n\u001c8t+\t\t9!A\teKN,'/[1mSj,wJ\u001a4tKR$2A^A\u0013\u0011\u0019\t9C\u0004a\u0001\u0003\u0006!!n]8o\u0003\u0019\u0019w.\\7jiR!\u0011QFA\u001a!\u0011\tI!a\f\n\u0007\u0005ErI\u0001\u0003V]&$\bBBA\u001b\u001f\u0001\u0007a/A\u0002f]\u0012\fAa\u001d;paR\u0011\u0011Q\u0006")
/* loaded from: input_file:com/couchbase/spark/kv/KeyValueDataStream.class */
public class KeyValueDataStream implements SparkDataStream, Logging {
    private SparkSession sparkSession;
    private CouchbaseConfig conf;
    private final KeyValueStreamConfig config;
    private final String checkpointLocation;
    private final DefaultFormats defaultFormats;
    private final Client dcpClient;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public DefaultFormats defaultFormats() {
        return this.defaultFormats;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [com.couchbase.spark.kv.KeyValueDataStream] */
    private SparkSession sparkSession$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.sparkSession = SparkSession$.MODULE$.active();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.sparkSession;
    }

    private SparkSession sparkSession() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? sparkSession$lzycompute() : this.sparkSession;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [com.couchbase.spark.kv.KeyValueDataStream] */
    private CouchbaseConfig conf$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.conf = CouchbaseConfig$.MODULE$.apply(sparkSession().sparkContext().getConf(), this.config.connectionIdentifier());
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.conf;
    }

    public CouchbaseConfig conf() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? conf$lzycompute() : this.conf;
    }

    public Client dcpClient() {
        return this.dcpClient;
    }

    public Offset initialOffset() {
        int numInputPartitions = this.config.numInputPartitions();
        logInfo(() -> {
            return new StringBuilder(80).append("Constructing initial offset for ").append(numInputPartitions).append(" input partitions ").append("over ").append(this.numKvPartitions()).append(" kv partitions (vbuckets)").toString();
        });
        List list = ((TraversableOnce) ((TraversableLike) reloadStartOffsetCheckpoints().zipWithIndex(Map$.MODULE$.canBuildFrom())).groupBy(tuple2 -> {
            return BoxesRunTime.boxToDouble($anonfun$initialOffset$2(numInputPartitions, tuple2));
        }).values().map(map -> {
            return new KeyValuePartitionOffset(map.keys().toMap(Predef$.MODULE$.$conforms()), None$.MODULE$);
        }, Iterable$.MODULE$.canBuildFrom())).toList();
        logDebug(() -> {
            return new StringBuilder(53).append("Initial Offset is grouped into following partitions: ").append(list).toString();
        });
        return new KeyValueOffset(list);
    }

    private Map<Object, KeyValueStreamOffset> reloadStartOffsetCheckpoints() {
        scala.collection.immutable.Map<Object, KeyValueStreamOffset> currentOffsets;
        Growable apply = Map$.MODULE$.apply(Nil$.MODULE$);
        Enumeration.Value streamFrom = this.config.streamFrom();
        Enumeration.Value FromBeginning = StreamFromVariants$.MODULE$.FromBeginning();
        if (FromBeginning != null ? !FromBeginning.equals(streamFrom) : streamFrom != null) {
            Enumeration.Value FromNow = StreamFromVariants$.MODULE$.FromNow();
            if (FromNow != null ? !FromNow.equals(streamFrom) : streamFrom != null) {
                throw new MatchError(streamFrom);
            }
            currentOffsets = currentOffsets();
        } else {
            currentOffsets = ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numKvPartitions()).map(obj -> {
                return $anonfun$reloadStartOffsetCheckpoints$1(BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        }
        Map<Object, KeyValueStreamOffset> $plus$plus$eq = apply.$plus$plus$eq(currentOffsets);
        KeyValueSourceInitialOffsetWriter keyValueSourceInitialOffsetWriter = new KeyValueSourceInitialOffsetWriter(sparkSession(), this.checkpointLocation);
        Some some = keyValueSourceInitialOffsetWriter.get(0L);
        if (some instanceof Some) {
            ((scala.collection.immutable.Map) some.value()).foreach(tuple2 -> {
                return $plus$plus$eq.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(tuple2._1$mcI$sp())), tuple2._2()));
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        keyValueSourceInitialOffsetWriter.add(0L, $plus$plus$eq.toMap(Predef$.MODULE$.$conforms()));
        return $plus$plus$eq;
    }

    public scala.collection.immutable.Map<Object, KeyValueStreamOffset> currentOffsets() {
        Buffer buffer = (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter((java.util.List) dcpClient().getSeqnos().collectList().block()).asScala();
        scala.collection.immutable.Map map = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter((java.util.List) dcpClient().failoverLogs((Collection) JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer) buffer.map(partitionAndSeqno -> {
            return Integer.valueOf(partitionAndSeqno.partition());
        }, Buffer$.MODULE$.canBuildFrom())).asJava()).map(byteBuf -> {
            int vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
            java.util.List entries = DcpFailoverLogResponse.entries(byteBuf);
            return new Tuple2.mcIJ.sp(vbucket, entries.size() > 0 ? ((FailoverLogEntry) entries.get(0)).getUuid() : 0L);
        }).collectList().block()).asScala()).toMap(Predef$.MODULE$.$conforms());
        return ((TraversableOnce) buffer.map(partitionAndSeqno2 -> {
            return new Tuple2(BoxesRunTime.boxToInteger(partitionAndSeqno2.partition()), new KeyValueStreamOffset(BoxesRunTime.unboxToLong(map.apply(BoxesRunTime.boxToInteger(partitionAndSeqno2.partition()))), partitionAndSeqno2.seqno(), partitionAndSeqno2.seqno(), partitionAndSeqno2.seqno(), 0L));
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public int numKvPartitions() {
        return dcpClient().numPartitions();
    }

    public Offset deserializeOffset(String str) {
        KeyValueOffset keyValueOffset = new KeyValueOffset((List) ((List) Serialization$.MODULE$.read(str, defaultFormats(), ManifestFactory$.MODULE$.classType(List.class, ManifestFactory$.MODULE$.classType(scala.collection.immutable.Map.class, ManifestFactory$.MODULE$.classType(String.class), Predef$.MODULE$.wrapRefArray(new Manifest[]{ManifestFactory$.MODULE$.classType(scala.collection.immutable.Map.class, ManifestFactory$.MODULE$.Int(), Predef$.MODULE$.wrapRefArray(new Manifest[]{ManifestFactory$.MODULE$.classType(KeyValueStreamOffset.class)}))})), Predef$.MODULE$.wrapRefArray(new Manifest[0])))).map(map -> {
            return new KeyValuePartitionOffset((scala.collection.immutable.Map) map.apply("start"), map.get("end"));
        }, List$.MODULE$.canBuildFrom()));
        logTrace(() -> {
            return new StringBuilder(27).append("Deserializing offset ").append(str).append(" into ").append(keyValueOffset).toString();
        });
        return keyValueOffset;
    }

    public void commit(Offset offset) {
    }

    public void stop() {
        dcpClient().close();
    }

    public static final /* synthetic */ double $anonfun$initialOffset$2(int i, Tuple2 tuple2) {
        return Math.floor(tuple2._2$mcI$sp() % i);
    }

    public static final /* synthetic */ Tuple2 $anonfun$reloadStartOffsetCheckpoints$1(int i) {
        return new Tuple2(BoxesRunTime.boxToInteger(i), KeyValueStreamOffset$.MODULE$.apply(StreamOffset.ZERO));
    }

    public KeyValueDataStream(KeyValueStreamConfig keyValueStreamConfig, String str) {
        this.config = keyValueStreamConfig;
        this.checkpointLocation = str;
        Logging.$init$(this);
        this.defaultFormats = DefaultFormats$.MODULE$;
        this.dcpClient = Client.builder().seedNodes(new String[]{CouchbaseConnection$.MODULE$.apply(keyValueStreamConfig.connectionIdentifier()).dcpSeedNodes(conf(), keyValueStreamConfig.connectionIdentifier())}).bucket(keyValueStreamConfig.bucket()).collectionsAware(keyValueStreamConfig.scope().isDefined() || keyValueStreamConfig.collections().nonEmpty()).scopeName((keyValueStreamConfig.scope().isDefined() && keyValueStreamConfig.collections().isEmpty()) ? (String) keyValueStreamConfig.scope().get() : null).collectionNames((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) keyValueStreamConfig.collections().map(str2 -> {
            String str2;
            Some scope = this.config.scope();
            if (scope instanceof Some) {
                str2 = (String) scope.value();
            } else {
                if (!None$.MODULE$.equals(scope)) {
                    throw new MatchError(scope);
                }
                str2 = "_default";
            }
            return new StringBuilder(1).append(str2).append(".").append(str2).toString();
        }, Seq$.MODULE$.canBuildFrom())).asJava()).userAgent(Version$.MODULE$.productName(), Version$.MODULE$.version(), new String[]{"stream"}).credentials(conf().credentials().username(), conf().credentials().password()).securityConfig(CouchbaseConnection$.MODULE$.apply(keyValueStreamConfig.connectionIdentifier()).dcpSecurityConfig(conf(), keyValueStreamConfig.connectionIdentifier())).bootstrapTimeout(DCPShared$.MODULE$.bootstrapTimeout()).build();
        dcpClient().connect().block();
    }
}
