package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import org.apache.kafka.common.utils.Bytes;

/* loaded from: input_file:dev/responsive/kafka/internal/db/CassandraKVFlushManager.class */
public class CassandraKVFlushManager extends KVFlushManager {
    private final String logPrefix;
    private final CassandraKeyValueTable table;
    private final CassandraClient client;
    private final TablePartitioner<Bytes, Integer> partitioner;
    private final int kafkaPartition;
    private final long epoch;

    public CassandraKVFlushManager(CassandraKeyValueTable cassandraKeyValueTable, CassandraClient cassandraClient, TablePartitioner<Bytes, Integer> tablePartitioner, int i, long j) {
        this.table = cassandraKeyValueTable;
        this.client = cassandraClient;
        this.partitioner = tablePartitioner;
        this.kafkaPartition = i;
        this.epoch = j;
        this.logPrefix = String.format("%s[%d] kv-store {epoch=%d} ", cassandraKeyValueTable.name(), Integer.valueOf(i), Long.valueOf(j));
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public TablePartitioner<Bytes, Integer> partitioner() {
        return this.partitioner;
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriter<Bytes, Integer> createWriter(Integer num) {
        return new LwtWriter(this.client, () -> {
            return this.table.ensureEpoch(num, this.epoch);
        }, this.table, this.kafkaPartition, num);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String failedFlushInfo(long j, Integer num) {
        return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>", Long.valueOf(j), Long.valueOf(this.table.fetchOffset(this.kafkaPartition)), Long.valueOf(this.epoch), Long.valueOf(this.table.fetchEpoch(num)));
    }

    @Override // dev.responsive.kafka.internal.db.KVFlushManager
    public RemoteWriteResult<Integer> updateOffset(long j) {
        int intValue = this.partitioner.metadataTablePartition(this.kafkaPartition).intValue();
        BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
        batchStatementBuilder.addStatement(this.table.ensureEpoch(Integer.valueOf(intValue), this.epoch));
        batchStatementBuilder.addStatement(this.table.setOffset(this.kafkaPartition, j));
        return this.client.execute((Statement<?>) batchStatementBuilder.build()).wasApplied() ? RemoteWriteResult.success(Integer.valueOf(intValue)) : RemoteWriteResult.failure(Integer.valueOf(intValue));
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String logPrefix() {
        return this.logPrefix;
    }
}
