package dev.responsive.kafka.internal.db;

import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.Constants;
import dev.responsive.kafka.internal.utils.Result;
import java.lang.Comparable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/db/BatchFlusher.class */
public class BatchFlusher<K extends Comparable<K>, P> {
    private final Logger log;
    private final KeySpec<K> keySpec;
    private final int kafkaPartition;
    private final FlushManager<K, P> flushManager;

    /* loaded from: input_file:dev/responsive/kafka/internal/db/BatchFlusher$FlushResult.class */
    public static class FlushResult<K, P> {
        private final RemoteWriteResult<P> result;
        private final int numTablePartitionsFlushed;
        private final FlushManager<K, P> flushManager;

        static <K, P> FlushResult<K, P> failed(RemoteWriteResult<P> remoteWriteResult, FlushManager<K, P> flushManager) {
            return new FlushResult<>(remoteWriteResult, 0, flushManager);
        }

        static <K, P> FlushResult<K, P> success(RemoteWriteResult<P> remoteWriteResult, int i) {
            return new FlushResult<>(remoteWriteResult, i, null);
        }

        private FlushResult(RemoteWriteResult<P> remoteWriteResult, int i, FlushManager<K, P> flushManager) {
            this.result = remoteWriteResult;
            this.numTablePartitionsFlushed = i;
            this.flushManager = flushManager;
        }

        public RemoteWriteResult<P> result() {
            return this.result;
        }

        public int numTablePartitionsFlushed() {
            return this.numTablePartitionsFlushed;
        }

        public String failedFlushInfo(long j, P p) {
            return this.flushManager.failedFlushInfo(j, p);
        }
    }

    public BatchFlusher(KeySpec<K> keySpec, int i, FlushManager<K, P> flushManager) {
        this.keySpec = keySpec;
        this.kafkaPartition = i;
        this.flushManager = flushManager;
        this.log = new LogContext(flushManager.logPrefix()).logger(BatchFlusher.class);
    }

    public FlushResult<K, P> flushWriteBatch(Map<K, Result<K>> map, long j) {
        BatchWriters batchWriters = new BatchWriters(this.flushManager, this.kafkaPartition);
        prepareBatch(batchWriters, map, this.keySpec);
        RemoteWriteResult<P> preFlush = this.flushManager.preFlush();
        if (!preFlush.wasApplied()) {
            this.log.warn("Failed on pre-flush callback for write batch (consumedOffset={})", Long.valueOf(j));
            return FlushResult.failed(preFlush, this.flushManager);
        }
        try {
            RemoteWriteResult remoteWriteResult = (RemoteWriteResult) flushBatch(batchWriters).get(10L, Constants.BLOCKING_TIMEOUT_UNIT);
            if (!remoteWriteResult.wasApplied()) {
                this.log.warn("Failed to flush write batch (consumedOffset={}) on table partition {}", Long.valueOf(j), remoteWriteResult.tablePartition());
                return FlushResult.failed(remoteWriteResult, this.flushManager);
            }
            RemoteWriteResult<P> postFlush = this.flushManager.postFlush(j);
            if (postFlush.wasApplied()) {
                return FlushResult.success(postFlush, batchWriters.numTablePartitionsInBatch());
            }
            this.log.warn("Failed on post-flush callback for write batch (consumedOffset={})", Long.valueOf(j));
            return FlushResult.failed(postFlush, this.flushManager);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.log.error("Unexpected exception while flushing to remote", e);
            throw new RuntimeException("Failed while flushing batch for kafka partition " + this.kafkaPartition + " to remote", e);
        }
    }

    private static <K extends Comparable<K>, P> void prepareBatch(BatchWriters<K, P> batchWriters, Map<K, Result<K>> map, KeySpec<K> keySpec) {
        for (Result<K> result : map.values()) {
            RemoteWriter<K, P> findOrAddWriter = batchWriters.findOrAddWriter(result.key);
            if (result.isTombstone) {
                findOrAddWriter.delete(result.key);
            } else if (keySpec.retain(result.key)) {
                findOrAddWriter.insert(result.key, result.value, result.timestamp);
            }
        }
    }

    private static <K, P> CompletableFuture<RemoteWriteResult<P>> flushBatch(BatchWriters<K, P> batchWriters) {
        CompletionStage completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(null));
        Iterator<RemoteWriter<K, P>> it = batchWriters.allWriters().iterator();
        while (it.hasNext()) {
            completedStage = completedStage.thenCombine(it.next().flush(), (remoteWriteResult, remoteWriteResult2) -> {
                return !remoteWriteResult.wasApplied() ? remoteWriteResult : remoteWriteResult2;
            });
        }
        return completedStage.toCompletableFuture();
    }
}
