package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.WindowedKey;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/db/CassandraWindowFlushManager.class */
public class CassandraWindowFlushManager extends WindowFlushManager {
    private final String logPrefix;
    private final Logger log;
    private final CassandraWindowedTable table;
    private final CassandraClient client;
    private final TablePartitioner<WindowedKey, Segmenter.SegmentPartition> partitioner;
    private final int kafkaPartition;
    private final long epoch;

    public CassandraWindowFlushManager(CassandraWindowedTable cassandraWindowedTable, CassandraClient cassandraClient, WindowSegmentPartitioner windowSegmentPartitioner, int i, long j, long j2) {
        super(cassandraWindowedTable.name(), i, windowSegmentPartitioner.segmenter(), j2);
        this.table = cassandraWindowedTable;
        this.client = cassandraClient;
        this.partitioner = windowSegmentPartitioner;
        this.kafkaPartition = i;
        this.epoch = j;
        this.logPrefix = String.format("%s[%d] window-store {epoch=%s} ", cassandraWindowedTable.name(), Integer.valueOf(i), Long.valueOf(j));
        this.log = new LogContext(this.logPrefix).logger(CassandraWindowFlushManager.class);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public TablePartitioner<WindowedKey, Segmenter.SegmentPartition> partitioner() {
        return this.partitioner;
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriter<WindowedKey, Segmenter.SegmentPartition> createWriter(Segmenter.SegmentPartition segmentPartition) {
        return new LwtWriter(this.client, () -> {
            return this.table.ensureEpoch(segmentPartition, this.epoch);
        }, this.table, this.kafkaPartition, segmentPartition);
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> createSegment(Segmenter.SegmentPartition segmentPartition) {
        return this.table.createSegment(this.kafkaPartition, this.epoch, segmentPartition);
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> deleteSegment(Segmenter.SegmentPartition segmentPartition) {
        return this.table.deleteSegment(this.kafkaPartition, segmentPartition);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String failedFlushInfo(long j, Segmenter.SegmentPartition segmentPartition) {
        return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>", Long.valueOf(j), Long.valueOf(this.table.fetchOffset(this.kafkaPartition)), Long.valueOf(this.epoch), Long.valueOf(this.table.fetchEpoch(segmentPartition)));
    }

    @Override // dev.responsive.kafka.internal.db.WindowFlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> updateOffsetAndStreamTime(long j, long j2) {
        Segmenter.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(this.kafkaPartition);
        BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
        batchStatementBuilder.addStatement(this.table.ensureEpoch(metadataTablePartition, this.epoch));
        batchStatementBuilder.addStatement(this.table.setOffset(this.kafkaPartition, j));
        batchStatementBuilder.addStatement(this.table.setStreamTime(this.kafkaPartition, this.epoch, j2));
        return !this.client.execute((Statement<?>) batchStatementBuilder.build()).wasApplied() ? RemoteWriteResult.failure(metadataTablePartition) : RemoteWriteResult.success(metadataTablePartition);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public String logPrefix() {
        return this.logPrefix;
    }
}
