package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.db.BytesKeySpec;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.spec.CassandraTableSpec;
import dev.responsive.kafka.internal.stores.SchemaTypes;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SharedClients;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/PartitionedOperations.class */
public class PartitionedOperations implements KeyValueOperations {
    private final ResponsiveKeyValueParams params;
    private final RemoteKVTable table;
    private final CommitBuffer<Bytes, RemoteKVTable> buffer;
    private final SubPartitioner partitioner;
    private final TopicPartition partition;
    private final InternalProcessorContext context;
    private final ResponsiveStoreRegistration registration;

    public static PartitionedOperations create(TableName tableName, StateStoreContext stateStoreContext, ResponsiveKeyValueParams responsiveKeyValueParams, CassandraTableSpec cassandraTableSpec) throws InterruptedException, TimeoutException {
        RemoteKVTable create;
        Logger logger = new LogContext(String.format("store [%s] ", tableName.kafkaName())).logger(PartitionedOperations.class);
        InternalProcessorContext asInternalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(stateStoreContext.appConfigs());
        SharedClients loadSharedClients = SharedClients.loadSharedClients(stateStoreContext.appConfigs());
        CassandraClient cassandraClient = loadSharedClients.cassandraClient;
        TopicPartition topicPartition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, tableName.kafkaName(), false), asInternalProcessorContext.taskId().partition());
        SubPartitioner subPartitioner = responsiveKeyValueParams.schemaType() == SchemaTypes.KVSchema.FACT ? SubPartitioner.NO_SUBPARTITIONS : responsiveConfig.getSubPartitioner(loadSharedClients.admin, tableName, topicPartition.topic());
        switch (responsiveKeyValueParams.schemaType()) {
            case KEY_VALUE:
                create = cassandraClient.kvFactory().create(cassandraTableSpec);
                break;
            case FACT:
                create = cassandraClient.factFactory().create(cassandraTableSpec);
                break;
            default:
                throw new IllegalArgumentException("Unexpected schema type " + responsiveKeyValueParams.schemaType());
        }
        logger.info("Remote table {} is available for querying.", tableName.cassandraName());
        CommitBuffer from = CommitBuffer.from(loadSharedClients, topicPartition, create, new BytesKeySpec(), responsiveKeyValueParams.truncateChangelog(), subPartitioner, responsiveConfig);
        from.init();
        long offset = from.offset();
        String kafkaName = tableName.kafkaName();
        long j = offset == -1 ? 0L : offset;
        Objects.requireNonNull(from);
        return new PartitionedOperations(responsiveKeyValueParams, create, from, subPartitioner, topicPartition, asInternalProcessorContext, new ResponsiveStoreRegistration(kafkaName, topicPartition, j, (v1) -> {
            r5.flush(v1);
        }));
    }

    public PartitionedOperations(ResponsiveKeyValueParams responsiveKeyValueParams, RemoteKVTable remoteKVTable, CommitBuffer<Bytes, RemoteKVTable> commitBuffer, SubPartitioner subPartitioner, TopicPartition topicPartition, InternalProcessorContext internalProcessorContext, ResponsiveStoreRegistration responsiveStoreRegistration) {
        this.params = responsiveKeyValueParams;
        this.table = remoteKVTable;
        this.buffer = commitBuffer;
        this.partitioner = subPartitioner;
        this.partition = topicPartition;
        this.context = internalProcessorContext;
        this.registration = responsiveStoreRegistration;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void register(ResponsiveStoreRegistry responsiveStoreRegistry) {
        responsiveStoreRegistry.registerStore(this.registration);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void deregister(ResponsiveStoreRegistry responsiveStoreRegistry) {
        responsiveStoreRegistry.deregisterStore(this.registration);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void put(Bytes bytes, byte[] bArr) {
        this.buffer.put(bytes, bArr, this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] delete(Bytes bytes) {
        byte[] bArr = get(bytes);
        this.buffer.tombstone(bytes, this.context.timestamp());
        return bArr;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] get(Bytes bytes) {
        Result<Bytes> result = this.buffer.get(bytes);
        if (result == null) {
            return this.table.get(this.partitioner.partition(this.partition.partition(), bytes), bytes, ((Long) this.params.timeToLive().map(duration -> {
                return Long.valueOf(this.context.timestamp() - duration.toMillis());
            }).orElse(-1L)).longValue());
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> all() {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public long approximateNumEntries() {
        return this.partitioner.all(this.partition.partition()).mapToLong(i -> {
            return this.table.cassandraClient().count(this.table.name(), i);
        }).sum();
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.buffer.close();
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        this.buffer.restoreBatch(collection);
    }
}
