package dev.responsive.kafka.internal.db;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoException;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.client.MongoCollection;
import dev.responsive.kafka.internal.db.mongo.KVDoc;
import dev.responsive.kafka.internal.db.mongo.MongoKVTable;
import dev.responsive.kafka.internal.db.mongo.MongoWriter;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/db/MongoKVFlushManager.class */
public class MongoKVFlushManager extends KVFlushManager {
    private final String logPrefix;
    private final Logger log;
    private final MongoKVTable table;
    private final MongoCollection<KVDoc> kvDocs;
    private final TablePartitioner<Bytes, Integer> partitioner = TablePartitioner.defaultPartitioner();
    private final int kafkaPartition;

    public MongoKVFlushManager(MongoKVTable mongoKVTable, MongoCollection<KVDoc> mongoCollection, int i) {
        this.table = mongoKVTable;
        this.kvDocs = mongoCollection;
        this.kafkaPartition = i;
        this.logPrefix = String.format("%s[%d] kv-store {epoch=%d} ", mongoKVTable.name(), Integer.valueOf(i), Long.valueOf(mongoKVTable.localEpoch(i)));
        this.log = new LogContext(this.logPrefix).logger(MongoKVFlushManager.class);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public TablePartitioner<Bytes, Integer> partitioner() {
        return this.partitioner;
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriter<Bytes, Integer> createWriter(Integer num, long j) {
        return new MongoWriter(this.table, this.kafkaPartition, num, () -> {
            return this.kvDocs;
        });
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String failedFlushInfo(long j, Integer num) {
        return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>", Long.valueOf(j), Long.valueOf(this.table.fetchOffset(this.kafkaPartition)), Long.valueOf(this.table.localEpoch(this.kafkaPartition)), Long.valueOf(this.table.fetchEpoch(this.kafkaPartition)));
    }

    @Override // dev.responsive.kafka.internal.db.KVFlushManager
    public RemoteWriteResult<Integer> updateOffset(long j) {
        try {
            this.table.setOffset(this.kafkaPartition, j);
            return RemoteWriteResult.success(Integer.valueOf(this.kafkaPartition));
        } catch (MongoBulkWriteException e) {
            this.log.warn("Bulk write operation failed", e);
            WriteConcernError writeConcernError = e.getWriteConcernError();
            if (writeConcernError != null) {
                this.log.warn("Bulk write operation failed due to write concern error {}", writeConcernError);
            } else {
                this.log.warn("Bulk write operation failed due to error(s): {}", e.getWriteErrors());
            }
            return RemoteWriteResult.failure(Integer.valueOf(this.kafkaPartition));
        } catch (MongoException e2) {
            this.log.error("Unexpected exception running the bulk write operation", e2);
            throw new RuntimeException("Bulk write operation failed", e2);
        }
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String logPrefix() {
        return this.logPrefix;
    }
}
