package dev.responsive.kafka.store;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.db.CassandraClient;
import dev.responsive.model.Result;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/* loaded from: input_file:dev/responsive/kafka/store/AtomicWriter.class */
class AtomicWriter<K> {
    private final CassandraClient client;
    private final String tableName;
    private final int partition;
    private final List<BatchableStatement<?>> statements;
    private final BufferPlugin<K> plugin;
    private final int batchSize;
    private final long epoch;

    public AtomicWriter(CassandraClient cassandraClient, String str, int i, BufferPlugin<K> bufferPlugin, long j, int i2) {
        this.client = cassandraClient;
        this.tableName = str;
        this.partition = i;
        this.plugin = bufferPlugin;
        this.epoch = j;
        this.batchSize = i2;
        this.statements = new ArrayList(i2);
    }

    public void addAll(Iterable<Result<K>> iterable) {
        iterable.forEach(this::add);
    }

    public void add(Result<K> result) {
        if (result.isTombstone || this.plugin.retain(result.key)) {
            this.statements.add(result.isTombstone ? this.plugin.deleteData(this.client, this.tableName, this.partition, result.key) : this.plugin.insertData(this.client, this.tableName, this.partition, result.key, result.value));
        }
    }

    public CompletionStage<AtomicWriteResult> flush() {
        CompletionStage<AtomicWriteResult> completedStage = CompletableFuture.completedStage(AtomicWriteResult.success(this.partition));
        Iterator<BatchableStatement<?>> it = this.statements.iterator();
        do {
            BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
            batchStatementBuilder.setIdempotence(true);
            batchStatementBuilder.addStatement(this.client.ensureEpoch(this.tableName, this.partition, this.epoch));
            for (int i = 0; i < this.batchSize && it.hasNext(); i++) {
                batchStatementBuilder.addStatement(it.next());
            }
            completedStage = completedStage.thenCompose(atomicWriteResult -> {
                return executeAsync(batchStatementBuilder.build());
            });
        } while (it.hasNext());
        this.statements.clear();
        return completedStage;
    }

    private CompletionStage<AtomicWriteResult> executeAsync(Statement<?> statement) {
        return this.client.executeAsync(statement).thenApply(asyncResultSet -> {
            return AtomicWriteResult.of(this.partition, asyncResultSet);
        });
    }

    public int partition() {
        return this.partition;
    }

    public AtomicWriteResult setOffset(long j) {
        return this.client.execute(this.client.setOffset(this.tableName, this.partition, j, this.epoch)).wasApplied() ? AtomicWriteResult.success(this.partition) : AtomicWriteResult.failure(this.partition);
    }
}
