package dev.responsive.db;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.kafka.store.RemoteWriteResult;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

/* loaded from: input_file:dev/responsive/db/LwtWriter.class */
public class LwtWriter<K> implements RemoteWriter<K> {
    private final CassandraClient client;
    private final Supplier<BatchableStatement<?>> fencingStatementFactory;
    private final RemoteSchema<K> schema;
    private final String name;
    private final int partition;
    private final int batchSize;
    private final List<BatchableStatement<?>> statements = new ArrayList();

    public LwtWriter(CassandraClient cassandraClient, Supplier<BatchableStatement<?>> supplier, RemoteSchema<K> remoteSchema, String str, int i, int i2) {
        this.client = cassandraClient;
        this.fencingStatementFactory = supplier;
        this.schema = remoteSchema;
        this.name = str;
        this.partition = i;
        this.batchSize = i2;
    }

    @Override // dev.responsive.db.RemoteWriter
    public void insert(K k, byte[] bArr, long j) {
        this.statements.add(this.schema.insert(this.name, this.partition, k, bArr, j));
    }

    @Override // dev.responsive.db.RemoteWriter
    public void delete(K k) {
        this.statements.add(this.schema.delete(this.name, this.partition, k));
    }

    @Override // dev.responsive.db.RemoteWriter
    public CompletionStage<RemoteWriteResult> flush() {
        CompletionStage<RemoteWriteResult> completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(this.partition));
        Iterator<BatchableStatement<?>> it = this.statements.iterator();
        while (it.hasNext()) {
            BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
            batchStatementBuilder.setIdempotence(true);
            batchStatementBuilder.addStatement(this.fencingStatementFactory.get());
            for (int i = 0; i < this.batchSize && it.hasNext(); i++) {
                batchStatementBuilder.addStatement(it.next());
            }
            completedStage = completedStage.thenCompose(remoteWriteResult -> {
                return executeAsync(batchStatementBuilder.build());
            });
        }
        return completedStage;
    }

    @Override // dev.responsive.db.RemoteWriter
    public RemoteWriteResult setOffset(long j) {
        BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
        batchStatementBuilder.addStatement(this.fencingStatementFactory.get());
        batchStatementBuilder.addStatement(this.schema.setOffset(this.name, this.partition, j));
        return this.client.execute((Statement<?>) batchStatementBuilder.build()).wasApplied() ? RemoteWriteResult.success(this.partition) : RemoteWriteResult.failure(this.partition);
    }

    @Override // dev.responsive.db.RemoteWriter
    public int partition() {
        return this.partition;
    }

    private CompletionStage<RemoteWriteResult> executeAsync(Statement<?> statement) {
        return this.client.executeAsync(statement).thenApply(asyncResultSet -> {
            return RemoteWriteResult.of(this.partition, asyncResultSet);
        });
    }
}
