package dev.responsive.kafka.store;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.db.CassandraClient;
import dev.responsive.db.FencingToken;
import dev.responsive.db.KeySpec;
import dev.responsive.db.RemoteSchema;
import dev.responsive.model.Result;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/* loaded from: input_file:dev/responsive/kafka/store/AtomicWriter.class */
class AtomicWriter<K> {
    private final CassandraClient client;
    private final String name;
    private final int partition;
    private final List<BatchableStatement<?>> statements;
    private final RemoteSchema<K> schema;
    private final KeySpec<K> keySpec;
    private final FencingToken fencingToken;
    private final int batchSize;

    public AtomicWriter(CassandraClient cassandraClient, String str, int i, RemoteSchema<K> remoteSchema, KeySpec<K> keySpec, FencingToken fencingToken, int i2) {
        this.client = cassandraClient;
        this.name = str;
        this.partition = i;
        this.schema = remoteSchema;
        this.keySpec = keySpec;
        this.fencingToken = fencingToken;
        this.batchSize = i2;
        this.statements = new ArrayList(i2);
    }

    public void addAll(Iterable<Result<K>> iterable) {
        iterable.forEach(this::add);
    }

    public void add(Result<K> result) {
        if (result.isTombstone || this.keySpec.retain(result.key)) {
            this.statements.add(result.isTombstone ? this.schema.delete(this.name, this.partition, result.key) : this.schema.insert(this.name, this.partition, result.key, result.value));
        }
    }

    public CompletionStage<AtomicWriteResult> flush() {
        CompletionStage<AtomicWriteResult> completedStage = CompletableFuture.completedStage(AtomicWriteResult.success(this.partition));
        Iterator<BatchableStatement<?>> it = this.statements.iterator();
        do {
            BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
            batchStatementBuilder.setIdempotence(true);
            this.fencingToken.addFencingStatement(batchStatementBuilder, this.partition);
            for (int i = 0; i < this.batchSize && it.hasNext(); i++) {
                batchStatementBuilder.addStatement(it.next());
            }
            completedStage = completedStage.thenCompose(atomicWriteResult -> {
                return executeAsync(batchStatementBuilder.build());
            });
        } while (it.hasNext());
        this.statements.clear();
        return completedStage;
    }

    private CompletionStage<AtomicWriteResult> executeAsync(Statement<?> statement) {
        return this.client.executeAsync(statement).thenApply(asyncResultSet -> {
            return AtomicWriteResult.of(this.partition, asyncResultSet);
        });
    }

    public int partition() {
        return this.partition;
    }

    public AtomicWriteResult setOffset(long j) {
        BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
        this.fencingToken.addFencingStatement(batchStatementBuilder, this.partition);
        batchStatementBuilder.addStatement(this.schema.setOffset(this.name, this.fencingToken, this.partition, j));
        return this.client.execute((Statement<?>) batchStatementBuilder.build()).wasApplied() ? AtomicWriteResult.success(this.partition) : AtomicWriteResult.failure(this.partition);
    }
}
