package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.Relation;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.api.stores.TtlProvider;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.CheckReturnValue;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/CassandraFactTable.class */
public class CassandraFactTable implements RemoteKVTable<BoundStatement> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraFactTable.class);
    private final String name;
    private final CassandraClient client;
    private final Optional<TtlResolver<?, ?>> ttlResolver;
    private final PreparedStatement get;
    private final PreparedStatement getWithTimestamp;
    private final PreparedStatement insert;
    private final PreparedStatement insertWithTtl;
    private final PreparedStatement delete;
    private final PreparedStatement fetchOffset;
    private final PreparedStatement setOffset;

    public CassandraFactTable(String str, CassandraClient cassandraClient, Optional<TtlResolver<?, ?>> optional, PreparedStatement preparedStatement, PreparedStatement preparedStatement2, PreparedStatement preparedStatement3, PreparedStatement preparedStatement4, PreparedStatement preparedStatement5, PreparedStatement preparedStatement6, PreparedStatement preparedStatement7) {
        this.name = str;
        this.client = cassandraClient;
        this.ttlResolver = optional;
        this.get = preparedStatement;
        this.getWithTimestamp = preparedStatement2;
        this.insert = preparedStatement3;
        this.insertWithTtl = preparedStatement4;
        this.delete = preparedStatement5;
        this.fetchOffset = preparedStatement6;
        this.setOffset = preparedStatement7;
    }

    public static CassandraFactTable create(RemoteTableSpec remoteTableSpec, CassandraClient cassandraClient) {
        String tableName = remoteTableSpec.tableName();
        Optional<TtlResolver<?, ?>> ttlResolver = remoteTableSpec.ttlResolver();
        LOG.info("Creating fact data table {} in remote store.", tableName);
        CreateTableWithOptions applyDefaultOptions = remoteTableSpec.applyDefaultOptions(createTable(tableName, ttlResolver));
        CreateTable withColumn = SchemaBuilder.createTable(metadataTable(tableName)).ifNotExists().withPartitionKey(ColumnName.ROW_TYPE.column(), DataTypes.TINYINT).withPartitionKey(ColumnName.PARTITION_KEY.column(), DataTypes.INT).withColumn(ColumnName.OFFSET.column(), DataTypes.BIGINT);
        cassandraClient.execute((Statement<?>) applyDefaultOptions.build());
        cassandraClient.execute((Statement<?>) withColumn.build());
        return new CassandraFactTable(tableName, cassandraClient, ttlResolver, cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.TIMESTAMP.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind()))).allowFiltering().build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_VALUE.column(), ColumnName.TIMESTAMP.column()}).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).allowFiltering().build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.insertInto(tableName).value(ColumnName.ROW_TYPE.column(), RowType.DATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind())).value(ColumnName.TIMESTAMP.column(), QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind())).value(ColumnName.DATA_VALUE.column(), QueryBuilder.bindMarker(ColumnName.DATA_VALUE.bind())).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.insertInto(tableName).value(ColumnName.ROW_TYPE.column(), RowType.DATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind())).value(ColumnName.TIMESTAMP.column(), QueryBuilder.bindMarker(ColumnName.TIMESTAMP.bind())).value(ColumnName.DATA_VALUE.column(), QueryBuilder.bindMarker(ColumnName.DATA_VALUE.bind())).usingTtl(QueryBuilder.bindMarker(ColumnName.TTL_SECONDS.bind())).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.deleteFrom(tableName).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).build(), QueryOp.WRITE), cassandraClient.prepare(QueryBuilder.selectFrom(metadataTable(tableName)).column(ColumnName.OFFSET.column()).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).build(), QueryOp.READ), cassandraClient.prepare(QueryBuilder.update(metadataTable(tableName)).setColumn(ColumnName.OFFSET.column(), QueryBuilder.bindMarker(ColumnName.OFFSET.bind())).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).build(), QueryOp.WRITE));
    }

    private static CreateTableWithOptions createTable(String str, Optional<TtlResolver<?, ?>> optional) {
        CreateTable withColumn = SchemaBuilder.createTable(str).ifNotExists().withPartitionKey(ColumnName.ROW_TYPE.column(), DataTypes.TINYINT).withPartitionKey(ColumnName.DATA_KEY.column(), DataTypes.BLOB).withColumn(ColumnName.TIMESTAMP.column(), DataTypes.TIMESTAMP).withColumn(ColumnName.DATA_VALUE.column(), DataTypes.BLOB);
        return (optional.isPresent() && optional.get().defaultTtl().isFinite()) ? withColumn.withDefaultTimeToLiveSeconds((int) optional.get().defaultTtl().toSeconds()) : withColumn;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public CassandraFactFlushManager init(int i) {
        this.client.execute((Statement<?>) QueryBuilder.insertInto(metadataTable(this.name)).value(ColumnName.ROW_TYPE.column(), RowType.METADATA_ROW.literal()).value(ColumnName.PARTITION_KEY.column(), ColumnName.PARTITION_KEY.literal(Integer.valueOf(i))).value(ColumnName.OFFSET.column(), ColumnName.OFFSET.literal(-1L)).ifNotExists().build());
        return new CassandraFactFlushManager(this, this.client, i);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        List all = this.client.execute((Statement<?>) this.fetchOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), i)).all();
        if (all.size() > 1) {
            throw new IllegalStateException(String.format("Expected at most one offset row for %s[%s] but got %d", this.name, Integer.valueOf(i), Integer.valueOf(all.size())));
        }
        if (all.isEmpty()) {
            return -1L;
        }
        long j = ((Row) all.get(0)).getLong(ColumnName.OFFSET.column());
        LOG.info("Got offset for {}[{}]: {}", new Object[]{this.name, Integer.valueOf(i), Long.valueOf(j)});
        return j;
    }

    public BoundStatement setOffset(int i, long j) {
        LOG.info("Setting offset in metadata table {} for {}[{}] to {}", new Object[]{metadataTable(this.name), this.name, Integer.valueOf(i), Long.valueOf(j)});
        return this.setOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), i).setLong(ColumnName.OFFSET.bind(), j);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        throw new UnsupportedOperationException("approximateNumEntries is not supported on fact tables");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    @CheckReturnValue
    public BoundStatement delete(int i, Bytes bytes) {
        return this.delete.bind(new Object[0]).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get()));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    @CheckReturnValue
    public BoundStatement insert(int i, Bytes bytes, byte[] bArr, long j) {
        if (this.ttlResolver.isPresent()) {
            Optional<TtlProvider.TtlDuration> computeTtl = this.ttlResolver.get().computeTtl(bytes, bArr);
            if (computeTtl.isPresent() && !computeTtl.get().equals(this.ttlResolver.get().defaultTtl())) {
                return this.insertWithTtl.bind(new Object[0]).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setByteBuffer(ColumnName.DATA_VALUE.bind(), ByteBuffer.wrap(bArr)).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(j)).setInt(ColumnName.TTL_SECONDS.bind(), computeTtl.get().isFinite() ? (int) computeTtl.get().toSeconds() : 0);
            }
        }
        return this.insert.bind(new Object[0]).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setByteBuffer(ColumnName.DATA_VALUE.bind(), ByteBuffer.wrap(bArr)).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(j));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        if (this.ttlResolver.isEmpty()) {
            return simpleGet(bytes);
        }
        if (this.ttlResolver.get().needsValueToComputeTtl()) {
            return postFilterGet(bytes, j);
        }
        TtlProvider.TtlDuration resolveTtl = this.ttlResolver.get().resolveTtl(bytes, null);
        return resolveTtl.isFinite() ? preFilterGet(bytes, j - resolveTtl.toMillis()) : simpleGet(bytes);
    }

    private byte[] simpleGet(Bytes bytes) {
        return preFilterGet(bytes, -1L);
    }

    private byte[] preFilterGet(Bytes bytes, long j) {
        List all = this.client.execute((Statement<?>) this.get.bind(new Object[0]).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(ColumnName.TIMESTAMP.bind(), Instant.ofEpochMilli(j))).all();
        if (all.size() > 1) {
            throw new IllegalStateException("Received multiple results for the same key");
        }
        if (all.isEmpty()) {
            return null;
        }
        return getValueFromRow((Row) all.get(0));
    }

    private byte[] postFilterGet(Bytes bytes, long j) {
        List all = this.client.execute((Statement<?>) this.getWithTimestamp.bind(new Object[0]).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get()))).all();
        if (all.size() > 1) {
            throw new IllegalStateException("Received multiple results for the same key");
        }
        if (all.isEmpty()) {
            return null;
        }
        Row row = (Row) all.get(0);
        byte[] valueFromRow = getValueFromRow(row);
        TtlProvider.TtlDuration resolveTtl = this.ttlResolver.get().resolveTtl(bytes, valueFromRow);
        if (resolveTtl.isFinite()) {
            if (row.getInstant(ColumnName.TIMESTAMP.column()).toEpochMilli() < j - resolveTtl.toMillis()) {
                return null;
            }
        }
        return valueFromRow;
    }

    private byte[] getValueFromRow(Row row) {
        return ((ByteBuffer) Objects.requireNonNull(row.getByteBuffer(ColumnName.DATA_VALUE.column()))).array();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        throw new UnsupportedOperationException("range scans are not supported on fact tables.");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        throw new UnsupportedOperationException("all is not supported on fact tables");
    }

    private static String metadataTable(String str) {
        return str + "_md";
    }
}
