package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteWriter;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager.class */
class RS3KVFlushManager extends KVFlushManager {
    private static final Logger LOG = LoggerFactory.getLogger(RS3KVFlushManager.class);
    private final UUID storeId;
    private final RS3Client rs3Client;
    private final LssId lssId;
    private final RS3KVTable table;
    private final HashMap<Integer, Optional<Long>> writtenOffsets;
    private final int kafkaPartition;
    private final PssPartitioner pssPartitioner;
    private final HashMap<Integer, RS3KVWriter> writers = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager$NoopWriter.class */
    public static class NoopWriter implements RemoteWriter<Bytes, Integer> {
        private final int kafkaPartition;

        public NoopWriter(int i) {
            this.kafkaPartition = i;
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public void insert(Bytes bytes, byte[] bArr, long j) {
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public void delete(Bytes bytes) {
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public CompletionStage<RemoteWriteResult<Integer>> flush() {
            return CompletableFuture.completedStage(RemoteWriteResult.success(Integer.valueOf(this.kafkaPartition)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager$RS3KVWriter.class */
    public static class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
        private final StreamSender<WalEntry> streamSender;
        private final CompletionStage<Optional<Long>> resultFuture;
        private RS3KVTable table;
        private final int pssId;
        private final LssId lssId;
        private final long endOffset;
        private final int kafkaPartition;

        private RS3KVWriter(UUID uuid, RS3Client rS3Client, RS3KVTable rS3KVTable, int i, LssId lssId, long j, Optional<Long> optional, int i2) {
            this.table = (RS3KVTable) Objects.requireNonNull(rS3KVTable);
            this.pssId = i;
            this.lssId = lssId;
            this.endOffset = j;
            this.kafkaPartition = i2;
            StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync = rS3Client.writeWalSegmentAsync(uuid, lssId, i, optional, j);
            this.streamSender = writeWalSegmentAsync.sender();
            this.resultFuture = writeWalSegmentAsync.receiver();
        }

        long endOffset() {
            return this.endOffset;
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public void insert(Bytes bytes, byte[] bArr, long j) {
            this.streamSender.sendNext(this.table.insert(this.kafkaPartition, bytes, bArr, j));
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public void delete(Bytes bytes) {
            this.streamSender.sendNext(this.table.delete(this.kafkaPartition, bytes));
        }

        @Override // dev.responsive.kafka.internal.db.RemoteWriter
        public CompletionStage<RemoteWriteResult<Integer>> flush() {
            this.streamSender.finish();
            return this.resultFuture.thenApply(optional -> {
                RS3KVFlushManager.LOG.debug("last flushed offset for pss/lss {}/{} is {}", new Object[]{Integer.valueOf(this.pssId), this.lssId, optional});
                return RemoteWriteResult.success(Integer.valueOf(this.kafkaPartition));
            });
        }
    }

    public RS3KVFlushManager(UUID uuid, RS3Client rS3Client, LssId lssId, RS3KVTable rS3KVTable, HashMap<Integer, Optional<Long>> hashMap, int i, PssPartitioner pssPartitioner) {
        this.storeId = (UUID) Objects.requireNonNull(uuid);
        this.rs3Client = (RS3Client) Objects.requireNonNull(rS3Client);
        this.lssId = (LssId) Objects.requireNonNull(lssId);
        this.table = (RS3KVTable) Objects.requireNonNull(rS3KVTable);
        this.writtenOffsets = (HashMap) Objects.requireNonNull(hashMap);
        this.kafkaPartition = i;
        this.pssPartitioner = (PssPartitioner) Objects.requireNonNull(pssPartitioner);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public TablePartitioner<Bytes, Integer> partitioner() {
        return new PssTablePartitioner(this.pssPartitioner);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriter<Bytes, Integer> createWriter(Integer num, long j) {
        if (this.writers.containsKey(num)) {
            throw new IllegalStateException("already created writer for pss " + num);
        }
        Optional<Long> optional = this.writtenOffsets.get(num);
        if (optional.isPresent() && optional.get().longValue() >= j) {
            return new NoopWriter(this.kafkaPartition);
        }
        RS3KVWriter rS3KVWriter = new RS3KVWriter(this.storeId, this.rs3Client, this.table, num.intValue(), this.lssId, j, this.writtenOffsets.get(num), this.kafkaPartition);
        this.writers.put(num, rS3KVWriter);
        return rS3KVWriter;
    }

    @Override // dev.responsive.kafka.internal.db.KVFlushManager, dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriteResult<Integer> postFlush(long j) {
        for (Map.Entry<Integer, RS3KVWriter> entry : this.writers.entrySet()) {
            this.writtenOffsets.put(entry.getKey(), Optional.of(Long.valueOf(entry.getValue().endOffset())));
        }
        this.writers.clear();
        return super.postFlush(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<Long> writtenOffset(int i) {
        return this.writtenOffsets.get(Integer.valueOf(i));
    }

    @Override // dev.responsive.kafka.internal.db.KVFlushManager
    public RemoteWriteResult<Integer> updateOffset(long j) {
        return RemoteWriteResult.success(null);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String failedFlushInfo(long j, Integer num) {
        return "";
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String logPrefix() {
        return tableName() + ".rs3.flushmanager";
    }
}
