package dev.responsive.kafka.store;

import dev.responsive.db.CassandraClient;
import dev.responsive.db.RemoteWindowedSchema;
import dev.responsive.db.StampedKeySpec;
import dev.responsive.db.partitioning.SubPartitioner;
import dev.responsive.kafka.api.InternalConfigs;
import dev.responsive.kafka.clients.SharedClients;
import dev.responsive.kafka.config.ResponsiveConfig;
import dev.responsive.model.Result;
import dev.responsive.model.Stamped;
import dev.responsive.utils.Iterators;
import dev.responsive.utils.TableName;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/store/ResponsiveWindowStore.class */
public class ResponsiveWindowStore implements WindowStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveWindowStore.class);
    private CassandraClient client;
    private RemoteWindowedSchema schema;
    private final TableName name;
    private final long windowSize;
    private final Position position = Position.emptyPosition();
    private final long retentionPeriod;
    private boolean open;
    private InternalProcessorContext context;
    private int partition;
    private CommitBuffer<Stamped<Bytes>, RemoteWindowedSchema> buffer;
    private long observedStreamTime;
    private ResponsiveStoreRegistry registry;
    private ResponsiveStoreRegistration registration;
    private SubPartitioner partitioner;

    public ResponsiveWindowStore(TableName tableName, long j, long j2, boolean z) {
        this.name = tableName;
        this.retentionPeriod = j;
        this.windowSize = j2;
        if (z) {
            LOG.warn("ResponsiveWindowStore does not fully support retaining duplicates");
        }
    }

    public String name() {
        return this.name.kafkaName();
    }

    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveWindowStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            LOG.info("Initializing state store {}", this.name);
            this.context = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
            ResponsiveConfig responsiveConfig = new ResponsiveConfig(stateStoreContext.appConfigs());
            this.partition = stateStoreContext.taskId().partition();
            SharedClients sharedClients = new SharedClients(stateStoreContext.appConfigs());
            this.client = sharedClients.cassandraClient;
            this.schema = this.client.windowedSchema();
            this.schema.create(this.name.cassandraName());
            this.client.awaitTable(this.name.cassandraName(), sharedClients.executor).await(Duration.ofSeconds(60L));
            LOG.info("Remote table {} is available for querying.", this.name.cassandraName());
            this.schema.prepare(this.name.cassandraName());
            this.registry = InternalConfigs.loadStoreRegistry(stateStoreContext.appConfigs());
            TopicPartition topicPartition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, this.name.kafkaName(), false), this.partition);
            this.partitioner = responsiveConfig.getSubPartitioner(sharedClients.admin, this.name, topicPartition.topic());
            this.buffer = CommitBuffer.from(sharedClients, this.name, topicPartition, this.schema, new StampedKeySpec(this::withinRetention), this.partitioner, responsiveConfig);
            this.buffer.init();
            long offset = this.buffer.offset();
            String kafkaName = this.name.kafkaName();
            long j = offset == -1 ? 0L : offset;
            CommitBuffer<Stamped<Bytes>, RemoteWindowedSchema> commitBuffer = this.buffer;
            Objects.requireNonNull(commitBuffer);
            this.registration = new ResponsiveStoreRegistration(kafkaName, topicPartition, j, (v1) -> {
                r6.flush(v1);
            });
            this.registry.registerStore(this.registration);
            this.open = true;
            stateStoreContext.register(stateStore, this.buffer);
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    public boolean persistent() {
        return false;
    }

    public boolean isOpen() {
        return this.open;
    }

    private RecordCollector.Supplier asRecordCollector(StateStoreContext stateStoreContext) {
        return (RecordCollector.Supplier) stateStoreContext;
    }

    public void put(Bytes bytes, byte[] bArr, long j) {
        this.observedStreamTime = Math.max(this.observedStreamTime, j);
        this.buffer.put(new Stamped<>(bytes, j), bArr);
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] fetch(Bytes bytes, long j) {
        Result<Stamped<Bytes>> result = this.buffer.get(new Stamped<>(bytes, j));
        if (result != null) {
            if (result.isTombstone) {
                return null;
            }
            return result.value;
        }
        KeyValueIterator<Stamped<Bytes>, byte[]> fetch = this.schema.fetch(this.name.cassandraName(), this.partitioner.partition(this.partition, bytes), bytes, j, j + 1);
        if (fetch.hasNext()) {
            return (byte[]) ((KeyValue) fetch.next()).value;
        }
        return null;
    }

    public WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        long max = Math.max(this.observedStreamTime - this.retentionPeriod, j);
        return Iterators.windowed(new LocalRemoteKvIterator(this.buffer.range(new Stamped<>(bytes, max), new Stamped<>(bytes, j2)), this.schema.fetch(this.name.cassandraName(), this.partitioner.partition(this.partition, bytes), bytes, max, j2), ResponsiveWindowStore::compareKeys));
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    public WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
        long max = Math.max(this.observedStreamTime - this.retentionPeriod, j);
        return Iterators.windowed(new LocalRemoteKvIterator(this.buffer.backRange(new Stamped<>(bytes, max), new Stamped<>(bytes, j2)), this.schema.backFetch(this.name.cassandraName(), this.partitioner.partition(this.partition, bytes), bytes, max, j2), ResponsiveWindowStore::compareKeys));
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public void flush() {
    }

    public void close() {
    }

    public Position getPosition() {
        return this.position;
    }

    private boolean withinRetention(Stamped<Bytes> stamped) {
        return stamped.stamp > this.observedStreamTime - this.retentionPeriod;
    }

    public static int compareKeys(Stamped<Bytes> stamped, Stamped<Bytes> stamped2) {
        int compareTo = stamped.key.compareTo(stamped2.key);
        return compareTo != 0 ? compareTo : Long.compare(stamped.stamp, stamped2.stamp);
    }
}
