package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.Relation;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/CassandraKeyValueTable.class */
public class CassandraKeyValueTable implements RemoteKVTable<BoundStatement> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraKeyValueTable.class);
    private static final String FROM_BIND = "fk";
    private static final String TO_BIND = "tk";
    private final String name;
    private final CassandraClient client;
    private final SubPartitioner partitioner;
    private final Optional<TtlResolver<?, ?>> ttlResolver;
    private final PreparedStatement get;
    private final PreparedStatement range;
    private final PreparedStatement all;
    private final PreparedStatement insert;
    private final PreparedStatement delete;
    private final PreparedStatement fetchOffset;
    private final PreparedStatement setOffset;
    private final PreparedStatement fetchEpoch;
    private final PreparedStatement reserveEpoch;
    private final PreparedStatement ensureEpoch;

    public static CassandraKeyValueTable create(RemoteTableSpec remoteTableSpec, CassandraClient cassandraClient) throws InterruptedException, TimeoutException {
        String tableName = remoteTableSpec.tableName();
        Optional<TtlResolver<?, ?>> ttlResolver = remoteTableSpec.ttlResolver();
        LOG.info("Creating data table {} in remote store.", tableName);
        cassandraClient.execute((Statement<?>) remoteTableSpec.applyDefaultOptions(createTable(tableName, ttlResolver)).build());
        cassandraClient.awaitTable(tableName).await(Duration.ofSeconds(60L));
        return new CassandraKeyValueTable(tableName, cassandraClient, (SubPartitioner) remoteTableSpec.partitioner(), ttlResolver, cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.TIMESTAMP.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind()))).allowFiltering().build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.DATA_VALUE.column(), ColumnName.TIMESTAMP.column()}).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.DATA_KEY.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(FROM_BIND))).where((Relation) ColumnName.DATA_KEY.relation().isLessThanOrEqualTo(QueryBuilder.bindMarker(TO_BIND))).where((Relation) ColumnName.TIMESTAMP.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind()))).allowFiltering().build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.DATA_VALUE.column(), ColumnName.TIMESTAMP.column()}).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.TIMESTAMP.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind()))).allowFiltering().build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.insertInto(tableName).value(ColumnName.PARTITION_KEY.column(), QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind())).value(ColumnName.ROW_TYPE.column(), RowType.DATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind())).value(ColumnName.TIMESTAMP.column(), QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind())).value(ColumnName.DATA_VALUE.column(), QueryBuilder.bindMarker(ColumnName.DATA_VALUE.bind())).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.deleteFrom(tableName).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).column(ColumnName.OFFSET.column()).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.update(tableName).setColumn(ColumnName.OFFSET.column(), QueryBuilder.bindMarker(ColumnName.OFFSET.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).column(ColumnName.EPOCH.column()).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).build(), QueryOp.READ), cassandraClient.prepare(((Update) QueryBuilder.update(tableName).setColumn(ColumnName.EPOCH.column(), QueryBuilder.bindMarker(ColumnName.EPOCH.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).ifColumn(ColumnName.EPOCH.column()).isLessThan(QueryBuilder.bindMarker(ColumnName.EPOCH.bind()))).build(), QueryOp.WRITE), cassandraClient.prepare(((Update) QueryBuilder.update(tableName).setColumn(ColumnName.EPOCH.column(), QueryBuilder.bindMarker(ColumnName.EPOCH.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).ifColumn(ColumnName.EPOCH.column()).isEqualTo(QueryBuilder.bindMarker(ColumnName.EPOCH.bind()))).build(), QueryOp.WRITE));
    }

    private static CreateTableWithOptions createTable(String str, Optional<TtlResolver<?, ?>> optional) {
        CreateTable withColumn = SchemaBuilder.createTable(str).ifNotExists().withPartitionKey(ColumnName.PARTITION_KEY.column(), DataTypes.INT).withClusteringColumn(ColumnName.ROW_TYPE.column(), DataTypes.TINYINT).withClusteringColumn(ColumnName.DATA_KEY.column(), DataTypes.BLOB).withColumn(ColumnName.DATA_VALUE.column(), DataTypes.BLOB).withColumn(ColumnName.OFFSET.column(), DataTypes.BIGINT).withColumn(ColumnName.EPOCH.column(), DataTypes.BIGINT).withColumn(ColumnName.TIMESTAMP.column(), DataTypes.TIMESTAMP);
        return (optional.isPresent() && optional.get().defaultTtl().isFinite()) ? withColumn.withDefaultTimeToLiveSeconds((int) optional.get().defaultTtl().toSeconds()) : withColumn;
    }

    public CassandraKeyValueTable(String str, CassandraClient cassandraClient, SubPartitioner subPartitioner, Optional<TtlResolver<?, ?>> optional, PreparedStatement preparedStatement, PreparedStatement preparedStatement2, PreparedStatement preparedStatement3, PreparedStatement preparedStatement4, PreparedStatement preparedStatement5, PreparedStatement preparedStatement6, PreparedStatement preparedStatement7, PreparedStatement preparedStatement8, PreparedStatement preparedStatement9, PreparedStatement preparedStatement10) {
        this.name = str;
        this.client = cassandraClient;
        this.partitioner = subPartitioner;
        this.ttlResolver = optional;
        this.get = preparedStatement;
        this.range = preparedStatement2;
        this.all = preparedStatement3;
        this.insert = preparedStatement4;
        this.delete = preparedStatement5;
        this.fetchOffset = preparedStatement6;
        this.setOffset = preparedStatement7;
        this.fetchEpoch = preparedStatement8;
        this.reserveEpoch = preparedStatement9;
        this.ensureEpoch = preparedStatement10;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public CassandraKVFlushManager init(int i) {
        this.partitioner.allTablePartitions(i).forEach(num -> {
            this.client.execute((Statement<?>) QueryBuilder.insertInto(this.name).value(ColumnName.PARTITION_KEY.column(), ColumnName.PARTITION_KEY.literal(num)).value(ColumnName.ROW_TYPE.column(), RowType.METADATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY)).value(ColumnName.TIMESTAMP.column(), ColumnName.TIMESTAMP.literal(-1L)).value(ColumnName.OFFSET.column(), ColumnName.OFFSET.literal(-1L)).value(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(0L)).ifNotExists().build());
        });
        long fetchEpoch = fetchEpoch(Integer.valueOf(this.partitioner.metadataTablePartition(i).intValue())) + 1;
        Iterator<Integer> it = this.partitioner.allTablePartitions(i).iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.client.execute((Statement<?>) reserveEpoch(Integer.valueOf(intValue), fetchEpoch)).wasApplied()) {
                String format = String.format("Could not initialize commit buffer [%s-%d] - attempted to claim epoch %d, but was fenced by a writer that claimed epoch %d on table partition %s", name(), Integer.valueOf(i), Long.valueOf(fetchEpoch), Long.valueOf(fetchEpoch(Integer.valueOf(intValue))), Integer.valueOf(intValue));
                TaskMigratedException taskMigratedException = new TaskMigratedException(format);
                LOG.warn(format, taskMigratedException);
                throw taskMigratedException;
            }
        }
        int intValue2 = this.partitioner.metadataTablePartition(i).intValue();
        LOG.info("Initialized store {} with epoch {} for subpartitions in range: {{} -> {}}", new Object[]{this.name, Long.valueOf(fetchEpoch), Integer.valueOf(intValue2), Integer.valueOf((intValue2 + this.partitioner.getFactor()) - 1)});
        return new CassandraKVFlushManager(this, this.client, this.partitioner, i, fetchEpoch);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        List all = this.client.execute((Statement<?>) this.get.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), this.partitioner.tablePartition(i, bytes).intValue()).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(this.ttlResolver.isEmpty() ? -1L : j - this.ttlResolver.get().defaultTtl().toMillis()))).all();
        if (all.size() > 1) {
            throw new IllegalStateException("Unexpected multiple results for point lookup");
        }
        if (all.isEmpty()) {
            return null;
        }
        return ((ByteBuffer) Objects.requireNonNull(((Row) all.get(0)).getByteBuffer(ColumnName.DATA_VALUE.column()))).array();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        long millis = this.ttlResolver.isEmpty() ? -1L : j - this.ttlResolver.get().defaultTtl().toMillis();
        LinkedList linkedList = new LinkedList();
        Iterator<Integer> it = this.partitioner.allTablePartitions(i).iterator();
        while (it.hasNext()) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.range.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), it.next().intValue()).setByteBuffer(FROM_BIND, ByteBuffer.wrap(bytes.get())).setByteBuffer(TO_BIND, ByteBuffer.wrap(bytes2.get())).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(millis))).iterator(), CassandraKeyValueTable::rows));
        }
        return Iterators.wrapped(linkedList);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        long millis = this.ttlResolver.isEmpty() ? -1L : j - this.ttlResolver.get().defaultTtl().toMillis();
        LinkedList linkedList = new LinkedList();
        Iterator<Integer> it = this.partitioner.allTablePartitions(i).iterator();
        while (it.hasNext()) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.all.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), it.next().intValue()).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(millis))).iterator(), CassandraKeyValueTable::rows));
        }
        return Iterators.wrapped(linkedList);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement insert(int i, Bytes bytes, byte[] bArr, long j) {
        return this.insert.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), this.partitioner.tablePartition(i, bytes).intValue()).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(j)).setByteBuffer(ColumnName.DATA_VALUE.bind(), ByteBuffer.wrap(bArr));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public BoundStatement delete(int i, Bytes bytes) {
        return this.delete.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), this.partitioner.tablePartition(i, bytes).intValue()).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get()));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        List all = this.client.execute((Statement<?>) this.fetchOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), this.partitioner.metadataTablePartition(i).intValue())).all();
        if (all.size() > 1) {
            throw new IllegalStateException(String.format("Expected at most one offset row for %s[%s] but got %d", this.name, Integer.valueOf(i), Integer.valueOf(all.size())));
        }
        if (all.isEmpty()) {
            return -1L;
        }
        return ((Row) all.get(0)).getLong(ColumnName.OFFSET.column());
    }

    public BoundStatement setOffset(int i, long j) {
        return this.setOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), this.partitioner.metadataTablePartition(i).intValue()).setLong(ColumnName.OFFSET.bind(), j);
    }

    public long fetchEpoch(Integer num) {
        List all = this.client.execute((Statement<?>) this.fetchEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), num.intValue())).all();
        if (all.size() != 1) {
            throw new IllegalStateException(String.format("Expected exactly one epoch metadata row for %s[%s] but got %d", this.name, num, Integer.valueOf(all.size())));
        }
        return ((Row) all.get(0)).getLong(ColumnName.EPOCH.column());
    }

    public BoundStatement reserveEpoch(Integer num, long j) {
        return this.reserveEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), num.intValue()).setLong(ColumnName.EPOCH.bind(), j);
    }

    public BoundStatement ensureEpoch(Integer num, long j) {
        return this.ensureEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), num.intValue()).setLong(ColumnName.EPOCH.bind(), j);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        return this.partitioner.allTablePartitions(i).stream().mapToLong(num -> {
            return this.client.count(name(), num.intValue());
        }).sum();
    }

    private static KeyValue<Bytes, byte[]> rows(Row row) {
        return new KeyValue<>(Bytes.wrap(row.getByteBuffer(ColumnName.DATA_KEY.column()).array()), row.getByteBuffer(ColumnName.DATA_VALUE.column()).array());
    }
}
