package dev.responsive.kafka.internal.db;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoException;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.client.MongoCollection;
import dev.responsive.kafka.internal.db.mongo.MongoWindowTable;
import dev.responsive.kafka.internal.db.mongo.MongoWriter;
import dev.responsive.kafka.internal.db.mongo.WindowDoc;
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.function.Function;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/db/MongoWindowFlushManager.class */
public class MongoWindowFlushManager extends WindowFlushManager {
    private final String logPrefix;
    private final Logger log;
    private final MongoWindowTable table;
    private final Function<Segmenter.SegmentPartition, MongoCollection<WindowDoc>> windowsForSegment;
    private final WindowSegmentPartitioner partitioner;
    private final int kafkaPartition;

    public MongoWindowFlushManager(MongoWindowTable mongoWindowTable, Function<Segmenter.SegmentPartition, MongoCollection<WindowDoc>> function, WindowSegmentPartitioner windowSegmentPartitioner, int i, long j) {
        super(mongoWindowTable.name(), i, windowSegmentPartitioner.segmenter(), j);
        this.table = mongoWindowTable;
        this.windowsForSegment = function;
        this.partitioner = windowSegmentPartitioner;
        this.kafkaPartition = i;
        this.logPrefix = String.format("%s[%d] window-store {epoch=%d} ", mongoWindowTable.name(), Integer.valueOf(i), Long.valueOf(mongoWindowTable.localEpoch(i)));
        this.log = new LogContext(this.logPrefix).logger(MongoWindowFlushManager.class);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public TablePartitioner<WindowedKey, Segmenter.SegmentPartition> partitioner() {
        return this.partitioner;
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriter<WindowedKey, Segmenter.SegmentPartition> createWriter(Segmenter.SegmentPartition segmentPartition) {
        this.log.debug("Creating writer for segment {}", segmentPartition);
        return new MongoWriter(this.table, this.kafkaPartition, segmentPartition, () -> {
            return this.windowsForSegment.apply(segmentPartition);
        });
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> createSegment(Segmenter.SegmentPartition segmentPartition) {
        return this.table.createSegmentForPartition(this.kafkaPartition, segmentPartition);
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> deleteSegment(Segmenter.SegmentPartition segmentPartition) {
        return this.table.deleteSegmentForPartition(this.kafkaPartition, segmentPartition);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String failedFlushInfo(long j, Segmenter.SegmentPartition segmentPartition) {
        return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>", Long.valueOf(j), Long.valueOf(this.table.fetchOffset(this.kafkaPartition)), Long.valueOf(this.table.localEpoch(this.kafkaPartition)), Long.valueOf(this.table.fetchEpoch(this.kafkaPartition)));
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> updateOffsetAndStreamTime(long j, long j2) {
        try {
            this.table.setOffsetAndStreamTime(this.kafkaPartition, j, j2);
            return RemoteWriteResult.success(null);
        } catch (MongoException e) {
            this.log.error("Unexpected exception running the bulk write operation", e);
            throw new RuntimeException("Bulk write operation failed", e);
        } catch (MongoBulkWriteException e2) {
            this.log.warn("Bulk write operation failed", e2);
            WriteConcernError writeConcernError = e2.getWriteConcernError();
            if (writeConcernError != null) {
                this.log.warn("Bulk write operation failed due to write concern error {}", writeConcernError);
            } else {
                this.log.warn("Bulk write operation failed due to error(s): {}", e2.getWriteErrors());
            }
            return RemoteWriteResult.failure(this.partitioner.metadataTablePartition(this.kafkaPartition));
        }
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String logPrefix() {
        return this.logPrefix;
    }
}
