package dev.responsive.kafka.internal.db.mongo;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.internal.db.MetadataRow;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.WriterFactory;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.metrics.OrderedTagsSupplier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.types.ObjectId;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoKVTable.class */
public class MongoKVTable implements RemoteKVTable {
    private final String name;
    private final MongoCollection<KVDoc> collection;
    private final MongoCollection<MetadataDoc> metadata;
    private final ConcurrentMap<Integer, ObjectId> metadataRows = new ConcurrentHashMap();

    public MongoKVTable(MongoClient mongoClient, String str) {
        this.name = str;
        MongoDatabase withCodecRegistry = mongoClient.getDatabase(str).withCodecRegistry(CodecRegistries.fromRegistries(new CodecRegistry[]{MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(new CodecProvider[]{PojoCodecProvider.builder().automatic(true).build()})}));
        this.collection = withCodecRegistry.getCollection(str, KVDoc.class);
        this.metadata = withCodecRegistry.getCollection(str, MetadataDoc.class);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriterFactory<Bytes> init(SubPartitioner subPartitioner, int i) {
        this.metadataRows.computeIfAbsent(Integer.valueOf(i), num -> {
            UpdateResult updateOne = this.metadata.updateOne(Filters.eq(OrderedTagsSupplier.PARTITION_TAG, Integer.valueOf(i)), Updates.setOnInsert("offset", -1L), new UpdateOptions().upsert(true));
            if (updateOne.getUpsertedId() != null) {
                return updateOne.getUpsertedId().asObjectId().getValue();
            }
            MetadataDoc metadataDoc = (MetadataDoc) this.metadata.find(Filters.eq(OrderedTagsSupplier.PARTITION_TAG, Integer.valueOf(i))).first();
            if (metadataDoc == null) {
                throw new IllegalStateException("No metadata partition despite upsert");
            }
            return metadataDoc.id();
        });
        return new MongoWriterFactory(this);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        KVDoc kVDoc = (KVDoc) this.collection.find(Filters.eq("key", bytes.get())).first();
        if (kVDoc == null) {
            return null;
        }
        return kVDoc.getValue();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement insert(int i, Bytes bytes, byte[] bArr, long j) {
        this.collection.updateOne(Filters.eq("key", bytes.get()), Updates.set("value", bArr), new UpdateOptions().upsert(true));
        return null;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement delete(int i, Bytes bytes) {
        this.collection.deleteOne(Filters.eq("key", bytes));
        return null;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public MetadataRow metadata(int i) {
        MetadataDoc metadataDoc = (MetadataDoc) this.metadata.find(Filters.eq("_id", this.metadataRows.get(Integer.valueOf(i)))).first();
        if (metadataDoc == null) {
            throw new IllegalStateException("Expected to find metadata row");
        }
        return new MetadataRow(metadataDoc.offset, -1L);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement setOffset(int i, long j) {
        this.metadata.updateOne(Filters.eq("_id", this.metadataRows.get(Integer.valueOf(i))), Updates.set("offset", Long.valueOf(j)));
        return null;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long approximateNumEntries(int i) {
        return 0L;
    }
}
