package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/RS3KVTable.class */
public class RS3KVTable implements RemoteKVTable<WalEntry> {
    private static final Logger LOG = LoggerFactory.getLogger(RS3KVTable.class);
    private final String name;
    private final UUID storeId;
    private final RS3Client rs3Client;
    private final PssPartitioner pssPartitioner;
    private LssId lssId;
    private Long fetchOffset = -1L;
    private RS3KVFlushManager flushManager;

    public RS3KVTable(String str, UUID uuid, RS3Client rS3Client, PssPartitioner pssPartitioner) {
        this.name = (String) Objects.requireNonNull(str);
        this.storeId = (UUID) Objects.requireNonNull(uuid);
        this.rs3Client = (RS3Client) Objects.requireNonNull(rS3Client);
        this.pssPartitioner = (PssPartitioner) Objects.requireNonNull(pssPartitioner);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KVFlushManager init(int i) {
        if (this.flushManager != null) {
            LOG.error("already initialized for store {}:{}", this.name, Integer.valueOf(i));
            throw new IllegalStateException(String.format("already initialized for store %s:%d", this.name, Integer.valueOf(i)));
        }
        this.lssId = new LssId(i);
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = this.pssPartitioner.pssForLss(this.lssId).iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            hashMap.put(Integer.valueOf(intValue), this.rs3Client.getCurrentOffsets(this.storeId, this.lssId, intValue).writtenOffset());
        }
        Long l = (Long) hashMap.values().stream().map(optional -> {
            return (Long) optional.orElse(-1L);
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(-1L);
        if (l.longValue() == -1) {
            this.fetchOffset = -1L;
        } else {
            this.fetchOffset = l;
        }
        LOG.info("restore rs3 kv table from offset {} for {}. recorded written offsets: {}", new Object[]{this.fetchOffset, Integer.valueOf(i), (String) hashMap.entrySet().stream().map(entry -> {
            return String.format("%s -> %s", entry.getKey(), ((Optional) entry.getValue()).map((v0) -> {
                return v0.toString();
            }).orElse("none"));
        }).collect(Collectors.joining(","))});
        this.flushManager = new RS3KVFlushManager(this.storeId, this.rs3Client, this.lssId, this, hashMap, i, this.pssPartitioner);
        return this.flushManager;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        int pss = this.pssPartitioner.pss(bytes.get(), this.lssId);
        return this.rs3Client.get(this.storeId, this.lssId, pss, this.flushManager.writtenOffset(pss), bytes.get()).orElse(null);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        LOG.warn("approximateNumEntries not implemented for RS3");
        return -1L;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WalEntry insert(int i, Bytes bytes, byte[] bArr, long j) {
        return new Put(bytes.get(), bArr);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WalEntry delete(int i, Bytes bytes) {
        return new Put(bytes.get(), null);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        return this.fetchOffset.longValue();
    }
}
