package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.db.BatchFlusher;
import dev.responsive.kafka.internal.db.BytesKeySpec;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraTableSpecFactory;
import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.spec.CassandraTableSpec;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.stores.SchemaTypes;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.StoreUtil;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/PartitionedOperations.class */
public class PartitionedOperations implements KeyValueOperations {
    private final Logger log;
    private final InternalProcessorContext context;
    private final ResponsiveKeyValueParams params;
    private final RemoteKVTable<?> table;
    private final CommitBuffer<Bytes, ?> buffer;
    private final TopicPartition changelog;
    private final ResponsiveStoreRegistry storeRegistry;
    private final ResponsiveStoreRegistration registration;
    private final ResponsiveRestoreListener restoreListener;

    public static PartitionedOperations create(TableName tableName, StateStoreContext stateStoreContext, ResponsiveKeyValueParams responsiveKeyValueParams) throws InterruptedException, TimeoutException {
        RemoteKVTable<?> createMongo;
        Logger logger = new LogContext(String.format("store [%s] ", tableName.kafkaName())).logger(PartitionedOperations.class);
        InternalProcessorContext asInternalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        Map appConfigs = stateStoreContext.appConfigs();
        ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(appConfigs);
        SessionClients loadSessionClients = InternalSessionConfigs.loadSessionClients(appConfigs);
        ResponsiveStoreRegistry loadStoreRegistry = InternalSessionConfigs.loadStoreRegistry(appConfigs);
        TopicPartition topicPartition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, tableName.kafkaName(), false), asInternalProcessorContext.taskId().partition());
        switch (loadSessionClients.storageBackend()) {
            case CASSANDRA:
                createMongo = createCassandra(responsiveKeyValueParams, responsiveConfig, loadSessionClients, topicPartition.topic());
                break;
            case MONGO_DB:
                createMongo = createMongo(responsiveKeyValueParams, loadSessionClients);
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + loadSessionClients.storageBackend());
        }
        KVFlushManager init = createMongo.init(topicPartition.partition());
        logger.info("Remote table {} is available for querying.", tableName.tableName());
        BytesKeySpec bytesKeySpec = new BytesKeySpec();
        CommitBuffer from = CommitBuffer.from(new BatchFlusher(bytesKeySpec, topicPartition.partition(), init), loadSessionClients, topicPartition, bytesKeySpec, responsiveKeyValueParams.truncateChangelog(), responsiveKeyValueParams.name(), responsiveConfig);
        long fetchOffset = createMongo.fetchOffset(topicPartition.partition());
        String kafkaName = tableName.kafkaName();
        long j = fetchOffset == -1 ? 0L : fetchOffset;
        Objects.requireNonNull(from);
        ResponsiveStoreRegistration responsiveStoreRegistration = new ResponsiveStoreRegistration(kafkaName, topicPartition, j, (v1) -> {
            r5.flush(v1);
        });
        loadStoreRegistry.registerStore(responsiveStoreRegistration);
        return new PartitionedOperations(logger, asInternalProcessorContext, responsiveKeyValueParams, createMongo, from, topicPartition, loadStoreRegistry, responsiveStoreRegistration, loadSessionClients.restoreListener());
    }

    private static RemoteKVTable<?> createCassandra(ResponsiveKeyValueParams responsiveKeyValueParams, ResponsiveConfig responsiveConfig, SessionClients sessionClients, String str) throws InterruptedException, TimeoutException {
        TablePartitioner defaultPartitioner = responsiveKeyValueParams.schemaType() == SchemaTypes.KVSchema.FACT ? TablePartitioner.defaultPartitioner() : SubPartitioner.create(OptionalInt.empty(), StoreUtil.numPartitionsForKafkaTopic(sessionClients.admin(), str), responsiveKeyValueParams.name().tableName(), responsiveConfig, str);
        CassandraClient cassandraClient = sessionClients.cassandraClient();
        CassandraTableSpec fromKVParams = CassandraTableSpecFactory.fromKVParams(responsiveKeyValueParams, defaultPartitioner);
        switch (responsiveKeyValueParams.schemaType()) {
            case KEY_VALUE:
                return cassandraClient.kvFactory().create(fromKVParams);
            case FACT:
                return cassandraClient.factFactory().create(fromKVParams);
            default:
                throw new IllegalArgumentException("Unexpected schema type " + responsiveKeyValueParams.schemaType());
        }
    }

    private static RemoteKVTable<?> createMongo(ResponsiveKeyValueParams responsiveKeyValueParams, SessionClients sessionClients) throws InterruptedException, TimeoutException {
        return sessionClients.mongoClient().kvTable(responsiveKeyValueParams.name().tableName());
    }

    public PartitionedOperations(Logger logger, InternalProcessorContext internalProcessorContext, ResponsiveKeyValueParams responsiveKeyValueParams, RemoteKVTable<?> remoteKVTable, CommitBuffer<Bytes, ?> commitBuffer, TopicPartition topicPartition, ResponsiveStoreRegistry responsiveStoreRegistry, ResponsiveStoreRegistration responsiveStoreRegistration, ResponsiveRestoreListener responsiveRestoreListener) {
        this.log = logger;
        this.context = internalProcessorContext;
        this.params = responsiveKeyValueParams;
        this.table = remoteKVTable;
        this.buffer = commitBuffer;
        this.changelog = topicPartition;
        this.storeRegistry = responsiveStoreRegistry;
        this.registration = responsiveStoreRegistration;
        this.restoreListener = responsiveRestoreListener;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void put(Bytes bytes, byte[] bArr) {
        this.buffer.put(bytes, bArr, this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] delete(Bytes bytes) {
        byte[] bArr = get(bytes);
        this.buffer.tombstone(bytes, this.context.timestamp());
        return bArr;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] get(Bytes bytes) {
        Result<Bytes> result = this.buffer.get(bytes);
        if (result == null) {
            return this.table.get(this.changelog.partition(), bytes, minValidTimestamp());
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        if (bytes == null && bytes2 == null) {
            return all();
        }
        if (bytes != null && bytes2 != null) {
            return new LocalRemoteKvIterator(this.buffer.range(bytes, bytes2), this.table.range(this.changelog.partition(), bytes, bytes2, minValidTimestamp()));
        }
        this.log.error("Unable to serve range query with undefined bounds. Found {}=null", bytes == null ? "from" : "to");
        throw new UnsupportedOperationException("Open-ended range queries are not yet supported, please pass in non-null values for the lower and upper bounds or else use all() and filter the results. If your use case requires the ability to issue range queries with an undefined upper/lower bound, please reach out to us about supporting this feature.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> all() {
        return new LocalRemoteKvIterator(this.buffer.all(), this.table.all(this.changelog.partition(), minValidTimestamp()));
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public long approximateNumEntries() {
        return this.table.approximateNumEntries(this.changelog.partition());
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.buffer.close();
        this.restoreListener.onStoreClosed(this.changelog, this.params.name().kafkaName());
        this.storeRegistry.deregisterStore(this.registration);
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        this.buffer.restoreBatch(collection);
    }

    private long minValidTimestamp() {
        return ((Long) this.params.timeToLive().map(duration -> {
            return Long.valueOf(this.context.timestamp() - duration.toMillis());
        }).orElse(-1L)).longValue();
    }
}
