package dev.responsive.kafka.internal.db.mongo;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.internal.db.MongoKVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoKVTable.class */
public class MongoKVTable implements RemoteKVTable<WriteModel<KVDoc>> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoKVTable.class);
    private static final String KV_COLLECTION_NAME = "kv_data";
    private static final String METADATA_COLLECTION_NAME = "kv_metadata";
    private final String name;
    private final Optional<TtlResolver<?, ?>> ttlResolver;
    private final long defaultTtlMs;
    private final MongoCollection<KVDoc> docs;
    private final MongoCollection<KVMetadataDoc> metadata;
    private final ConcurrentMap<Integer, Long> kafkaPartitionToEpoch = new ConcurrentHashMap();
    private final KeyCodec keyCodec = new StringKeyCodec();

    public MongoKVTable(MongoClient mongoClient, String str, CollectionCreationOptions collectionCreationOptions, Optional<TtlResolver<?, ?>> optional) {
        this.name = str;
        this.ttlResolver = optional;
        MongoDatabase withCodecRegistry = mongoClient.getDatabase(str).withCodecRegistry(CodecRegistries.fromRegistries(new CodecRegistry[]{MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(new CodecProvider[]{PojoCodecProvider.builder().automatic(true).build()})}));
        if (collectionCreationOptions.sharded()) {
            this.docs = MongoUtils.createShardedCollection(KV_COLLECTION_NAME, KVDoc.class, withCodecRegistry, mongoClient.getDatabase("admin"), collectionCreationOptions.numChunks());
        } else {
            this.docs = withCodecRegistry.getCollection(KV_COLLECTION_NAME, KVDoc.class);
        }
        this.metadata = withCodecRegistry.getCollection(METADATA_COLLECTION_NAME, KVMetadataDoc.class);
        this.docs.createIndex(Indexes.descending(new String[]{"tombstoneTs"}), new IndexOptions().expireAfter(12L, TimeUnit.HOURS));
        if (!optional.isPresent()) {
            this.defaultTtlMs = 0L;
        } else {
            if (!optional.get().hasDefaultOnly()) {
                throw new UnsupportedOperationException("Row-level ttl is not yet supported with MongoDB");
            }
            this.defaultTtlMs = optional.get().defaultTtl().toMillis();
            this.docs.createIndex(Indexes.descending(new String[]{KVDoc.TIMESTAMP}), new IndexOptions().expireAfter(Long.valueOf(this.defaultTtlMs + Duration.ofHours(12L).toMillis()), TimeUnit.MILLISECONDS));
        }
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public MongoKVFlushManager init(int i) {
        KVMetadataDoc kVMetadataDoc = (KVMetadataDoc) this.metadata.findOneAndUpdate(Filters.eq("_id", Integer.valueOf(i)), Updates.combine(new Bson[]{Updates.setOnInsert("_id", Integer.valueOf(i)), Updates.setOnInsert("_id", Integer.valueOf(i)), Updates.setOnInsert("offset", -1L), Updates.inc("epoch", 1)}), new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
        if (kVMetadataDoc == null) {
            throw new IllegalStateException("Uninitialized metadata for partition " + i);
        }
        LOG.info("Retrieved initial metadata {}", kVMetadataDoc);
        this.kafkaPartitionToEpoch.put(Integer.valueOf(i), Long.valueOf(kVMetadataDoc.epoch));
        return new MongoKVFlushManager(this, this.docs, i);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        KVDoc kVDoc = (KVDoc) this.docs.find(Filters.and(new Bson[]{Filters.eq("_id", this.keyCodec.encode(bytes)), Filters.gte(KVDoc.TIMESTAMP, Long.valueOf(this.ttlResolver.isEmpty() ? -1L : j - this.defaultTtlMs))})).first();
        if (kVDoc == null) {
            return null;
        }
        return kVDoc.getValue();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        return Iterators.kv(this.docs.find(Filters.and(new Bson[]{Filters.gte("_id", this.keyCodec.encode(bytes)), Filters.lte("_id", this.keyCodec.encode(bytes2)), Filters.not(Filters.exists("tombstoneTs")), Filters.gte(KVDoc.TIMESTAMP, Long.valueOf(this.ttlResolver.isEmpty() ? -1L : j - this.defaultTtlMs)), Filters.eq("partition", Integer.valueOf(i))})).iterator(), kVDoc -> {
            return new KeyValue(this.keyCodec.decode(kVDoc.getKey()), kVDoc.getTombstoneTs() == null ? kVDoc.getValue() : null);
        });
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        return Iterators.kv(this.docs.find(Filters.and(new Bson[]{Filters.not(Filters.exists("tombstoneTs")), Filters.gte(KVDoc.TIMESTAMP, Long.valueOf(this.ttlResolver.isEmpty() ? -1L : j - this.defaultTtlMs)), Filters.eq("partition", Integer.valueOf(i))})).iterator(), kVDoc -> {
            return new KeyValue(this.keyCodec.decode(kVDoc.getKey()), kVDoc.getTombstoneTs() == null ? kVDoc.getValue() : null);
        });
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<KVDoc> insert(int i, Bytes bytes, byte[] bArr, long j) {
        long longValue = this.kafkaPartitionToEpoch.get(Integer.valueOf(i)).longValue();
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", this.keyCodec.encode(bytes)), Filters.lte("epoch", Long.valueOf(longValue))}), Updates.combine(new Bson[]{Updates.set("value", bArr), Updates.set("epoch", Long.valueOf(longValue)), Updates.set(KVDoc.TIMESTAMP, Long.valueOf(j)), Updates.set("partition", Integer.valueOf(i)), Updates.unset("tombstoneTs")}), new UpdateOptions().upsert(true));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<KVDoc> delete(int i, Bytes bytes) {
        long longValue = this.kafkaPartitionToEpoch.get(Integer.valueOf(i)).longValue();
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", this.keyCodec.encode(bytes)), Filters.lte("epoch", Long.valueOf(longValue))}), Updates.combine(new Bson[]{Updates.unset("value"), Updates.unset(KVDoc.TIMESTAMP), Updates.set("tombstoneTs", Date.from(Instant.now())), Updates.set("epoch", Long.valueOf(longValue))}), new UpdateOptions().upsert(true));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        KVMetadataDoc kVMetadataDoc = (KVMetadataDoc) this.metadata.find(Filters.eq("_id", Integer.valueOf(i))).first();
        if (kVMetadataDoc == null) {
            throw new IllegalStateException("Expected to find metadata row");
        }
        return kVMetadataDoc.offset;
    }

    public UpdateResult setOffset(int i, long j) {
        long longValue = this.kafkaPartitionToEpoch.get(Integer.valueOf(i)).longValue();
        return this.metadata.updateOne(Filters.and(new Bson[]{Filters.eq("_id", Integer.valueOf(i)), Filters.lte("epoch", Long.valueOf(longValue))}), Updates.combine(new Bson[]{Updates.set("offset", Long.valueOf(j)), Updates.set("epoch", Long.valueOf(longValue))}));
    }

    public long localEpoch(int i) {
        return this.kafkaPartitionToEpoch.get(Integer.valueOf(i)).longValue();
    }

    public long fetchEpoch(int i) {
        KVMetadataDoc kVMetadataDoc = (KVMetadataDoc) this.metadata.find(Filters.eq("_id", Integer.valueOf(i))).first();
        if (kVMetadataDoc == null) {
            throw new IllegalStateException("Expected to find metadata row");
        }
        return kVMetadataDoc.epoch;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        LOG.warn("approximateNumEntries is not yet implemented for Mongo");
        return 0L;
    }
}
