package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

/* loaded from: input_file:dev/responsive/kafka/internal/db/LwtWriter.class */
public class LwtWriter<K, P> implements RemoteWriter<K, P> {
    private final CassandraClient client;
    private final Supplier<BoundStatement> ensureEpoch;
    private final RemoteTable<K, BoundStatement> table;
    private final int kafkaPartition;
    private final P tablePartition;
    private final long maxBatchSize;
    private final List<BoundStatement> statements;

    public LwtWriter(CassandraClient cassandraClient, Supplier<BoundStatement> supplier, RemoteTable<K, BoundStatement> remoteTable, int i, P p) {
        this(cassandraClient, supplier, remoteTable, i, p, 1000L);
    }

    @VisibleForTesting
    LwtWriter(CassandraClient cassandraClient, Supplier<BoundStatement> supplier, RemoteTable<K, BoundStatement> remoteTable, int i, P p, long j) {
        this.client = cassandraClient;
        this.ensureEpoch = supplier;
        this.table = remoteTable;
        this.kafkaPartition = i;
        this.tablePartition = p;
        this.maxBatchSize = j;
        this.statements = new ArrayList();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void insert(K k, byte[] bArr, long j) {
        this.statements.add(this.table.insert(this.kafkaPartition, k, bArr, j));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void delete(K k) {
        this.statements.add(this.table.delete(this.kafkaPartition, k));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public CompletionStage<RemoteWriteResult<P>> flush() {
        CompletionStage<RemoteWriteResult<P>> completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(this.tablePartition));
        Iterator<BoundStatement> it = this.statements.iterator();
        while (it.hasNext()) {
            BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
            batchStatementBuilder.setIdempotence(true);
            batchStatementBuilder.addStatement(this.ensureEpoch.get());
            for (int i = 0; i < this.maxBatchSize && it.hasNext(); i++) {
                batchStatementBuilder.addStatement(it.next());
            }
            completedStage = completedStage.thenCompose(remoteWriteResult -> {
                return executeAsync(batchStatementBuilder.build());
            });
        }
        return completedStage;
    }

    private CompletionStage<RemoteWriteResult<P>> executeAsync(Statement<?> statement) {
        return (CompletionStage<RemoteWriteResult<P>>) this.client.executeAsync(statement).thenApply(asyncResultSet -> {
            return RemoteWriteResult.of(this.tablePartition, asyncResultSet);
        });
    }
}
