package dev.responsive.kafka.store;

import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.db.CassandraClient;
import dev.responsive.db.RemoteKeyValueSchema;
import dev.responsive.db.partitioning.SubPartitioner;
import dev.responsive.kafka.api.ResponsiveKeyValueParams;
import dev.responsive.kafka.clients.SharedClients;
import dev.responsive.utils.TableName;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/store/ResponsiveGlobalStore.class */
public class ResponsiveGlobalStore implements KeyValueStore<Bytes, byte[]> {
    private static final long ALL_VALID_TS = -1;
    private final Logger log;
    private final ResponsiveKeyValueParams params;
    private final TableName name;
    private final Position position = Position.emptyPosition();
    private GlobalProcessorContextImpl context;
    private int partition;
    private CassandraClient client;
    private RemoteKeyValueSchema schema;
    private boolean open;

    public ResponsiveGlobalStore(ResponsiveKeyValueParams responsiveKeyValueParams) {
        this.params = responsiveKeyValueParams;
        this.name = responsiveKeyValueParams.name();
        this.log = new LogContext(String.format("global-store [%s]", this.name.kafkaName())).logger(ResponsivePartitionedStore.class);
        throw new UnsupportedOperationException("Global tables are not available for use at this time");
    }

    public String name() {
        return this.name.kafkaName();
    }

    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            this.log.info("Initializing global state store {}", this.name);
            this.context = (GlobalProcessorContextImpl) stateStoreContext;
            this.partition = 0;
            this.client = SharedClients.loadSharedClients(stateStoreContext.appConfigs()).cassandraClient;
            this.schema = this.client.prepareKVTableSchema(this.params);
            this.log.info("Global table {} is available for querying.", this.name);
            this.schema.init(this.name.cassandraName(), SubPartitioner.NO_SUBPARTITIONS, this.partition);
            this.open = true;
            stateStoreContext.register(stateStore, (bArr, bArr2) -> {
                if (bArr2 == null) {
                    delete(new Bytes(bArr));
                } else {
                    put(new Bytes(bArr), bArr2);
                }
            });
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    public boolean isOpen() {
        return this.open;
    }

    public boolean persistent() {
        return false;
    }

    public void put(Bytes bytes, byte[] bArr) {
        this.client.execute((Statement<?>) this.schema.insert(this.name.cassandraName(), this.partition, bytes, bArr, this.context.timestamp()));
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        byte[] bArr2 = get(bytes);
        if (bArr2 == null) {
            put(bytes, bArr);
        }
        return bArr2;
    }

    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        list.forEach(keyValue -> {
            put((Bytes) keyValue.key, (byte[]) keyValue.value);
        });
    }

    public byte[] delete(Bytes bytes) {
        byte[] bArr = get(bytes);
        this.client.execute((Statement<?>) this.schema.delete(this.name.cassandraName(), this.partition, bytes));
        StoreQueryUtils.updatePosition(this.position, this.context);
        return bArr;
    }

    public byte[] get(Bytes bytes) {
        return this.schema.get(this.name.cassandraName(), this.partition, bytes, -1L);
    }

    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        return this.schema.range(this.name.cassandraName(), this.partition, bytes, bytes2, -1L);
    }

    public KeyValueIterator<Bytes, byte[]> all() {
        return this.schema.all(this.name.cassandraName(), this.partition, -1L);
    }

    public Position getPosition() {
        return this.position;
    }

    public long approximateNumEntries() {
        return this.client.count(this.name.cassandraName(), this.partition);
    }

    public void flush() {
    }

    public void close() {
    }
}
