package dev.responsive.kafka.internal.db.inmemory;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.kafka.api.stores.TtlProvider;
import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.class */
public class InMemoryKVTable implements RemoteKVTable<BoundStatement> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryKVTable.class);
    private int kafkaPartition;
    private final String name;
    private final Optional<TtlResolver<?, ?>> ttlResolver;
    private final ConcurrentNavigableMap<Bytes, Value> store = new ConcurrentSkipListMap();

    /* loaded from: input_file:dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable$InMemoryKVFlushManager.class */
    public class InMemoryKVFlushManager extends KVFlushManager {
        private InMemoryKVFlushManager() {
        }

        @Override // dev.responsive.kafka.internal.db.KVFlushManager
        public RemoteWriteResult<Integer> updateOffset(long j) {
            return RemoteWriteResult.success(Integer.valueOf(InMemoryKVTable.this.kafkaPartition));
        }

        @Override // dev.responsive.kafka.internal.db.FlushManager
        public String tableName() {
            return InMemoryKVTable.this.name;
        }

        @Override // dev.responsive.kafka.internal.db.FlushManager
        public TablePartitioner<Bytes, Integer> partitioner() {
            return TablePartitioner.defaultPartitioner();
        }

        @Override // dev.responsive.kafka.internal.db.FlushManager
        public RemoteWriter<Bytes, Integer> createWriter(final Integer num) {
            return new RemoteWriter<Bytes, Integer>() { // from class: dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable.InMemoryKVFlushManager.1
                @Override // dev.responsive.kafka.internal.db.RemoteWriter
                public void insert(Bytes bytes, byte[] bArr, long j) {
                    InMemoryKVTable.this.insert(num.intValue(), bytes, bArr, j);
                }

                @Override // dev.responsive.kafka.internal.db.RemoteWriter
                public void delete(Bytes bytes) {
                    InMemoryKVTable.this.delete(num.intValue(), bytes);
                }

                @Override // dev.responsive.kafka.internal.db.RemoteWriter
                public CompletionStage<RemoteWriteResult<Integer>> flush() {
                    return CompletableFuture.completedFuture(RemoteWriteResult.success(Integer.valueOf(InMemoryKVTable.this.kafkaPartition)));
                }
            };
        }

        @Override // dev.responsive.kafka.internal.db.FlushManager
        public String failedFlushInfo(long j, Integer num) {
            return "failed flush";
        }

        @Override // dev.responsive.kafka.internal.db.FlushManager
        public String logPrefix() {
            return "inmemory";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable$Value.class */
    public static class Value {
        private final long epochMillis;
        private final byte[] value;

        public Value(long j, byte[] bArr) {
            this.epochMillis = j;
            this.value = bArr;
        }

        public long epochMillis() {
            return this.epochMillis;
        }

        public byte[] value() {
            return this.value;
        }
    }

    public InMemoryKVTable(String str, Optional<TtlResolver<?, ?>> optional) {
        this.name = (String) Objects.requireNonNull(str);
        this.ttlResolver = optional;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KVFlushManager init(int i) {
        LOG.info("init in-memory kv store {} {}", this.name, Integer.valueOf(i));
        this.kafkaPartition = i;
        return new InMemoryKVFlushManager();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        checkKafkaPartition(i);
        Value value = (Value) this.store.get(bytes);
        if (value == null) {
            return null;
        }
        if (this.ttlResolver.isPresent()) {
            TtlProvider.TtlDuration resolveTtl = this.ttlResolver.get().resolveTtl(bytes, value.value());
            if (resolveTtl.isFinite()) {
                if (value.epochMillis < j - resolveTtl.toMillis()) {
                    return null;
                }
            }
        }
        return value.value();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        if (this.ttlResolver.isPresent() && !this.ttlResolver.get().hasDefaultOnly()) {
            throw new UnsupportedOperationException("Row-level ttl is not yet supported for range queries on in-memory tables or TTD");
        }
        checkKafkaPartition(i);
        return iteratorWithTimeFilter(this.store.tailMap((ConcurrentNavigableMap<Bytes, Value>) bytes, true).headMap((ConcurrentNavigableMap<Bytes, Value>) bytes2, true).entrySet().iterator(), this.ttlResolver.isEmpty() ? -1L : j - this.ttlResolver.get().defaultTtl().toMillis());
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        if (this.ttlResolver.isPresent() && !this.ttlResolver.get().hasDefaultOnly()) {
            throw new UnsupportedOperationException("Row-level ttl is not yet supported for range queries on in-memory tables or TTD");
        }
        checkKafkaPartition(i);
        return iteratorWithTimeFilter(this.store.entrySet().iterator(), this.ttlResolver.isEmpty() ? -1L : j - this.ttlResolver.get().defaultTtl().toMillis());
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        checkKafkaPartition(i);
        return this.store.size();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement insert(int i, Bytes bytes, byte[] bArr, long j) {
        checkKafkaPartition(i);
        this.store.put(bytes, new Value(j, bArr));
        return null;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement delete(int i, Bytes bytes) {
        checkKafkaPartition(i);
        this.store.remove(bytes);
        return null;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        checkKafkaPartition(i);
        return -1L;
    }

    private KeyValueIterator<Bytes, byte[]> iteratorWithTimeFilter(Iterator<Map.Entry<Bytes, Value>> it, long j) {
        return Iterators.kv(Iterators.filter(Iterators.kv(it, entry -> {
            return new KeyValue((Bytes) entry.getKey(), (Value) entry.getValue());
        }), keyValue -> {
            return ((Value) keyValue.value).epochMillis >= j;
        }), keyValue2 -> {
            return new KeyValue((Bytes) keyValue2.key, ((Value) keyValue2.value).value());
        });
    }

    private void checkKafkaPartition(int i) {
        if (this.kafkaPartition != i) {
            throw new IllegalStateException("unexpected partition: " + i + " for " + this.kafkaPartition);
        }
    }
}
