package dev.responsive.internal.stores;

import dev.responsive.api.config.ResponsiveConfig;
import dev.responsive.api.stores.ResponsiveKeyValueParams;
import dev.responsive.internal.config.InternalConfigs;
import dev.responsive.internal.db.BytesKeySpec;
import dev.responsive.internal.db.CassandraClient;
import dev.responsive.internal.db.RemoteKeyValueSchema;
import dev.responsive.internal.db.partitioning.SubPartitioner;
import dev.responsive.internal.stores.SchemaTypes;
import dev.responsive.internal.utils.Result;
import dev.responsive.internal.utils.SharedClients;
import dev.responsive.internal.utils.TableName;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/internal/stores/ResponsivePartitionedStore.class */
public class ResponsivePartitionedStore implements KeyValueStore<Bytes, byte[]> {
    private final Logger log;
    private final ResponsiveKeyValueParams params;
    private final TableName name;
    private final Position position = Position.emptyPosition();
    private InternalProcessorContext context;
    private TopicPartition partition;
    private CommitBuffer<Bytes, RemoteKeyValueSchema> buffer;
    private RemoteKeyValueSchema schema;
    private ResponsiveStoreRegistry storeRegistry;
    private ResponsiveStoreRegistration registration;
    private SubPartitioner partitioner;
    private boolean open;

    public ResponsivePartitionedStore(ResponsiveKeyValueParams responsiveKeyValueParams) {
        this.params = responsiveKeyValueParams;
        this.name = responsiveKeyValueParams.name();
        this.log = new LogContext(String.format("store [%s]", this.name.kafkaName())).logger(ResponsivePartitionedStore.class);
    }

    public String name() {
        return this.name.kafkaName();
    }

    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            this.log.info("Initializing state store");
            this.context = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
            ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(stateStoreContext.appConfigs());
            SharedClients loadSharedClients = SharedClients.loadSharedClients(stateStoreContext.appConfigs());
            CassandraClient cassandraClient = loadSharedClients.cassandraClient;
            this.storeRegistry = InternalConfigs.loadStoreRegistry(stateStoreContext.appConfigs());
            this.partition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, this.name.kafkaName(), false), this.context.taskId().partition());
            this.partitioner = this.params.schemaType() == SchemaTypes.KVSchema.FACT ? SubPartitioner.NO_SUBPARTITIONS : responsiveConfig.getSubPartitioner(loadSharedClients.admin, this.name, this.partition.topic());
            this.schema = cassandraClient.prepareKVTableSchema(this.params);
            this.log.info("Remote table {} is available for querying.", this.name.cassandraName());
            this.buffer = CommitBuffer.from(loadSharedClients, this.name, this.partition, this.schema, new BytesKeySpec(), this.params.truncateChangelog(), this.partitioner, responsiveConfig);
            this.buffer.init();
            this.open = true;
            long offset = this.buffer.offset();
            String kafkaName = this.name.kafkaName();
            TopicPartition topicPartition = this.partition;
            long j = offset == -1 ? 0L : offset;
            CommitBuffer<Bytes, RemoteKeyValueSchema> commitBuffer = this.buffer;
            Objects.requireNonNull(commitBuffer);
            this.registration = new ResponsiveStoreRegistration(kafkaName, topicPartition, j, (v1) -> {
                r6.flush(v1);
            });
            this.storeRegistry.registerStore(this.registration);
            stateStoreContext.register(stateStore, this.buffer);
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    public void flush() {
    }

    public boolean isOpen() {
        return this.open;
    }

    public boolean persistent() {
        return false;
    }

    public void put(Bytes bytes, byte[] bArr) {
        putInternal(bytes, bArr);
    }

    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        byte[] bArr2 = get(bytes);
        if (bArr2 == null) {
            putInternal(bytes, bArr);
        }
        return bArr2;
    }

    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        list.forEach(keyValue -> {
            putInternal((Bytes) keyValue.key, (byte[]) keyValue.value);
        });
    }

    public byte[] delete(Bytes bytes) {
        byte[] bArr = get(bytes);
        putInternal(bytes, null);
        return bArr;
    }

    private void putInternal(Bytes bytes, byte[] bArr) {
        if (bArr != null) {
            this.buffer.put(bytes, bArr, this.context.timestamp());
        } else {
            this.buffer.tombstone(bytes, this.context.timestamp());
        }
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] get(Bytes bytes) {
        Result<Bytes> result = this.buffer.get(bytes);
        if (result == null) {
            return this.schema.get(this.name.cassandraName(), this.partitioner.partition(this.partition.partition(), bytes), bytes, ((Long) this.params.timeToLive().map(duration -> {
                return Long.valueOf(this.context.timestamp() - duration.toMillis());
            }).orElse(-1L)).longValue());
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    public KeyValueIterator<Bytes, byte[]> all() {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    public Position getPosition() {
        return this.position;
    }

    public long approximateNumEntries() {
        return this.partitioner.all(this.partition.partition()).mapToLong(i -> {
            return this.schema.cassandraClient().count(this.name.cassandraName(), i);
        }).sum();
    }

    public void close() {
        if (this.storeRegistry != null) {
            this.storeRegistry.deregisterStore(this.registration);
        }
    }
}
