package dev.responsive.kafka.internal.stores;

import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.spec.CassandraTableSpec;
import dev.responsive.kafka.internal.utils.SharedClients;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.state.KeyValueIterator;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/GlobalOperations.class */
public class GlobalOperations implements KeyValueOperations {
    private static final long ALL_VALID_TS = -1;
    private static final int IGNORED_PARTITION = -1;
    private final GlobalProcessorContextImpl context;
    private final CassandraClient client;
    private final RemoteKVTable table;

    public static GlobalOperations create(StateStoreContext stateStoreContext, CassandraTableSpec cassandraTableSpec) throws InterruptedException, TimeoutException {
        GlobalProcessorContextImpl globalProcessorContextImpl = (GlobalProcessorContextImpl) stateStoreContext;
        CassandraClient cassandraClient = SharedClients.loadSharedClients(globalProcessorContextImpl.appConfigs()).cassandraClient;
        RemoteKVTable create = cassandraClient.globalFactory().create(cassandraTableSpec);
        create.init(SubPartitioner.NO_SUBPARTITIONS, -1);
        return new GlobalOperations(globalProcessorContextImpl, cassandraClient, create);
    }

    public GlobalOperations(GlobalProcessorContextImpl globalProcessorContextImpl, CassandraClient cassandraClient, RemoteKVTable remoteKVTable) {
        this.context = globalProcessorContextImpl;
        this.client = cassandraClient;
        this.table = remoteKVTable;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void register(ResponsiveStoreRegistry responsiveStoreRegistry) {
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void deregister(ResponsiveStoreRegistry responsiveStoreRegistry) {
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public void put(Bytes bytes, byte[] bArr) {
        put(bytes, bArr, this.context.partition(), this.context.offset(), this.context.timestamp());
    }

    private void put(Bytes bytes, byte[] bArr, int i, long j, long j2) {
        this.client.execute((Statement<?>) this.table.insert(-1, bytes, bArr, j2));
        this.client.execute((Statement<?>) this.table.setOffset(i, j));
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] delete(Bytes bytes) {
        return delete(bytes, this.context.partition(), this.context.offset());
    }

    private byte[] delete(Bytes bytes, int i, long j) {
        byte[] bArr = get(bytes);
        this.client.execute((Statement<?>) this.table.delete(-1, bytes));
        this.client.execute((Statement<?>) this.table.setOffset(i, j));
        return bArr;
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public byte[] get(Bytes bytes) {
        return this.table.get(-1, bytes, -1L);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        return this.table.range(-1, bytes, bytes2, -1L);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes bytes, Bytes bytes2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> all() {
        return this.table.all(-1, -1L);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations
    public long approximateNumEntries() {
        return this.client.count(this.table.name(), -1);
    }

    @Override // dev.responsive.kafka.internal.stores.KeyValueOperations, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            if (consumerRecord.offset() >= this.table.metadata(consumerRecord.partition()).offset) {
                if (consumerRecord.value() == null) {
                    delete(new Bytes((byte[]) consumerRecord.key()), consumerRecord.partition(), consumerRecord.offset());
                } else {
                    put(new Bytes((byte[]) consumerRecord.key()), (byte[]) consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
                }
            }
        }
    }
}
