package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.Relation;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import com.datastax.oss.driver.internal.querybuilder.schema.compaction.DefaultLeveledCompactionStrategy;
import dev.responsive.kafka.internal.db.partitioning.SegmentPartitioner;
import dev.responsive.kafka.internal.db.spec.CassandraTableSpec;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import javax.annotation.CheckReturnValue;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/CassandraWindowedTable.class */
public class CassandraWindowedTable implements RemoteWindowedTable<BoundStatement> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowedTable.class);
    private static final String KEY_FROM_BIND = "kf";
    private static final String KEY_TO_BIND = "kt";
    private static final String WINDOW_FROM_BIND = "wf";
    private static final String WINDOW_TO_BIND = "wt";
    private final String name;
    private final CassandraClient client;
    private final SegmentPartitioner partitioner;
    private final PreparedStatement createSegment;
    private final PreparedStatement expireSegment;
    private final PreparedStatement insert;
    private final PreparedStatement delete;
    private final PreparedStatement fetchSingle;
    private final PreparedStatement fetch;
    private final PreparedStatement fetchRange;
    private final PreparedStatement fetchAll;
    private final PreparedStatement backFetch;
    private final PreparedStatement backFetchRange;
    private final PreparedStatement backFetchAll;
    private final PreparedStatement fetchOffset;
    private final PreparedStatement setOffset;
    private final PreparedStatement fetchStreamTime;
    private final PreparedStatement setStreamTime;
    private final PreparedStatement fetchEpoch;
    private final PreparedStatement reserveEpoch;
    private final PreparedStatement ensureEpoch;

    public static CassandraWindowedTable create(CassandraTableSpec cassandraTableSpec, CassandraClient cassandraClient) throws InterruptedException, TimeoutException {
        String tableName = cassandraTableSpec.tableName();
        LOG.info("Creating windowed data table {} in remote store.", tableName);
        cassandraClient.execute((Statement<?>) cassandraTableSpec.applyOptions(createTable(tableName)).build());
        cassandraClient.awaitTable(tableName).await(Duration.ofSeconds(60L));
        PreparedStatement prepare = cassandraClient.prepare(QueryBuilder.insertInto(tableName).value(ColumnName.PARTITION_KEY.column(), QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind())).value(ColumnName.SEGMENT_ID.column(), QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind())).value(ColumnName.ROW_TYPE.column(), RowType.METADATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY)).value(ColumnName.WINDOW_START.column(), ColumnName.WINDOW_START.literal(-1L)).value(ColumnName.EPOCH.column(), QueryBuilder.bindMarker(ColumnName.EPOCH.bind())).ifNotExists().build());
        PreparedStatement prepare2 = cassandraClient.prepare(QueryBuilder.deleteFrom(tableName).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).build());
        PreparedStatement prepare3 = cassandraClient.prepare(QueryBuilder.insertInto(tableName).value(ColumnName.PARTITION_KEY.column(), QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind())).value(ColumnName.SEGMENT_ID.column(), QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind())).value(ColumnName.ROW_TYPE.column(), RowType.DATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind())).value(ColumnName.WINDOW_START.column(), QueryBuilder.bindMarker(ColumnName.WINDOW_START.bind())).value(ColumnName.DATA_VALUE.column(), QueryBuilder.bindMarker(ColumnName.DATA_VALUE.bind())).build());
        PreparedStatement prepare4 = cassandraClient.prepare(QueryBuilder.deleteFrom(tableName).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.WINDOW_START.bind()))).build());
        PreparedStatement prepare5 = cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.WINDOW_START.bind()))).build());
        PreparedStatement prepare6 = cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.WINDOW_START.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(WINDOW_FROM_BIND))).where((Relation) ColumnName.WINDOW_START.relation().isLessThan(QueryBuilder.bindMarker(WINDOW_TO_BIND))).build());
        PreparedStatement prepare7 = cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isGreaterThan(QueryBuilder.bindMarker(KEY_FROM_BIND))).where((Relation) ColumnName.DATA_KEY.relation().isLessThan(QueryBuilder.bindMarker(KEY_TO_BIND))).build());
        return new CassandraWindowedTable(tableName, cassandraClient, (SegmentPartitioner) cassandraTableSpec.partitioner(), prepare, prepare2, prepare3, prepare4, prepare5, prepare6, cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).build()), prepare7, cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.DATA_KEY.bind()))).where((Relation) ColumnName.WINDOW_START.relation().isGreaterThanOrEqualTo(QueryBuilder.bindMarker(WINDOW_FROM_BIND))).where((Relation) ColumnName.WINDOW_START.relation().isLessThan(QueryBuilder.bindMarker(WINDOW_TO_BIND))).orderBy(ColumnName.DATA_KEY.column(), ClusteringOrder.DESC).orderBy(ColumnName.WINDOW_START.column(), ClusteringOrder.DESC).build()), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).orderBy(ColumnName.DATA_KEY.column(), ClusteringOrder.DESC).orderBy(ColumnName.WINDOW_START.column(), ClusteringOrder.DESC).build()), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).columns(new String[]{ColumnName.DATA_KEY.column(), ColumnName.WINDOW_START.column(), ColumnName.DATA_VALUE.column()}).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isGreaterThan(QueryBuilder.bindMarker(KEY_FROM_BIND))).where((Relation) ColumnName.DATA_KEY.relation().isLessThan(QueryBuilder.bindMarker(KEY_TO_BIND))).orderBy(ColumnName.DATA_KEY.column(), ClusteringOrder.DESC).orderBy(ColumnName.WINDOW_START.column(), ClusteringOrder.DESC).build()), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).column(ColumnName.OFFSET.column()).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).build()), cassandraClient.prepare(QueryBuilder.update(tableName).setColumn(ColumnName.OFFSET.column(), QueryBuilder.bindMarker(ColumnName.OFFSET.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).build()), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).column(ColumnName.STREAM_TIME.column()).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).build()), cassandraClient.prepare(QueryBuilder.update(tableName).setColumn(ColumnName.STREAM_TIME.column(), QueryBuilder.bindMarker(ColumnName.STREAM_TIME.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).build()), cassandraClient.prepare(QueryBuilder.selectFrom(tableName).column(ColumnName.EPOCH.column()).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).build()), cassandraClient.prepare(((Update) QueryBuilder.update(tableName).setColumn(ColumnName.EPOCH.column(), QueryBuilder.bindMarker(ColumnName.EPOCH.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).ifColumn(ColumnName.EPOCH.column()).isLessThan(QueryBuilder.bindMarker(ColumnName.EPOCH.bind()))).build()), cassandraClient.prepare(((Update) QueryBuilder.update(tableName).setColumn(ColumnName.EPOCH.column(), QueryBuilder.bindMarker(ColumnName.EPOCH.bind())).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.SEGMENT_ID.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.SEGMENT_ID.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(-1L))).ifColumn(ColumnName.EPOCH.column()).isEqualTo(QueryBuilder.bindMarker(ColumnName.EPOCH.bind()))).build()));
    }

    private static CreateTableWithOptions createTable(String str) {
        return SchemaBuilder.createTable(str).ifNotExists().withPartitionKey(ColumnName.PARTITION_KEY.column(), DataTypes.INT).withPartitionKey(ColumnName.SEGMENT_ID.column(), DataTypes.BIGINT).withClusteringColumn(ColumnName.ROW_TYPE.column(), DataTypes.TINYINT).withClusteringColumn(ColumnName.DATA_KEY.column(), DataTypes.BLOB).withClusteringColumn(ColumnName.WINDOW_START.column(), DataTypes.TIMESTAMP).withColumn(ColumnName.DATA_VALUE.column(), DataTypes.BLOB).withColumn(ColumnName.OFFSET.column(), DataTypes.BIGINT).withColumn(ColumnName.EPOCH.column(), DataTypes.BIGINT).withColumn(ColumnName.STREAM_TIME.column(), DataTypes.BIGINT).withCompaction(new DefaultLeveledCompactionStrategy());
    }

    public CassandraWindowedTable(String str, CassandraClient cassandraClient, SegmentPartitioner segmentPartitioner, PreparedStatement preparedStatement, PreparedStatement preparedStatement2, PreparedStatement preparedStatement3, PreparedStatement preparedStatement4, PreparedStatement preparedStatement5, PreparedStatement preparedStatement6, PreparedStatement preparedStatement7, PreparedStatement preparedStatement8, PreparedStatement preparedStatement9, PreparedStatement preparedStatement10, PreparedStatement preparedStatement11, PreparedStatement preparedStatement12, PreparedStatement preparedStatement13, PreparedStatement preparedStatement14, PreparedStatement preparedStatement15, PreparedStatement preparedStatement16, PreparedStatement preparedStatement17, PreparedStatement preparedStatement18) {
        this.name = str;
        this.client = cassandraClient;
        this.partitioner = segmentPartitioner;
        this.createSegment = preparedStatement;
        this.expireSegment = preparedStatement2;
        this.insert = preparedStatement3;
        this.delete = preparedStatement4;
        this.fetchSingle = preparedStatement5;
        this.fetch = preparedStatement6;
        this.fetchAll = preparedStatement7;
        this.fetchRange = preparedStatement8;
        this.backFetch = preparedStatement9;
        this.backFetchAll = preparedStatement10;
        this.backFetchRange = preparedStatement11;
        this.fetchOffset = preparedStatement12;
        this.setOffset = preparedStatement13;
        this.fetchStreamTime = preparedStatement14;
        this.setStreamTime = preparedStatement15;
        this.fetchEpoch = preparedStatement16;
        this.reserveEpoch = preparedStatement17;
        this.ensureEpoch = preparedStatement18;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public CassandraWindowFlushManager init(int i) {
        SegmentPartitioner.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(i);
        if (this.client.execute((Statement<?>) QueryBuilder.insertInto(this.name).value(ColumnName.PARTITION_KEY.column(), ColumnName.PARTITION_KEY.literal(Integer.valueOf(metadataTablePartition.tablePartition))).value(ColumnName.SEGMENT_ID.column(), ColumnName.SEGMENT_ID.literal(Long.valueOf(metadataTablePartition.segmentId))).value(ColumnName.ROW_TYPE.column(), RowType.METADATA_ROW.literal()).value(ColumnName.DATA_KEY.column(), ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY)).value(ColumnName.WINDOW_START.column(), ColumnName.WINDOW_START.literal(-1L)).value(ColumnName.OFFSET.column(), ColumnName.OFFSET.literal(-1L)).value(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(0L)).value(ColumnName.STREAM_TIME.column(), ColumnName.STREAM_TIME.literal(-1L)).ifNotExists().build()).wasApplied()) {
            LOG.info("Created new metadata segment for kafka partition {}", Integer.valueOf(i));
        }
        long fetchEpoch = fetchEpoch(metadataTablePartition) + 1;
        if (!this.client.execute((Statement<?>) reserveEpoch(metadataTablePartition, fetchEpoch)).wasApplied()) {
            handleEpochFencing(i, metadataTablePartition, fetchEpoch);
        }
        long fetchStreamTime = fetchStreamTime(i);
        LOG.info("Initialized stream-time to {} with epoch {} for kafka partition {}", new Object[]{Long.valueOf(fetchStreamTime), Long.valueOf(fetchEpoch), Integer.valueOf(i)});
        List<SegmentPartitioner.SegmentPartition> activeSegments = this.partitioner.activeSegments(i, fetchStreamTime);
        if (activeSegments.isEmpty()) {
            LOG.info("Skipping reservation of epoch {} for kafka partition {} due to no active segments", Long.valueOf(fetchEpoch), Integer.valueOf(i));
        } else {
            long j = activeSegments.get(0).segmentId;
            long j2 = j;
            for (SegmentPartitioner.SegmentPartition segmentPartition : activeSegments) {
                if (!this.client.execute((Statement<?>) reserveEpoch(segmentPartition, fetchEpoch)).wasApplied()) {
                    handleEpochFencing(i, segmentPartition, fetchEpoch);
                }
                j2 = segmentPartition.segmentId;
            }
            LOG.info("Reserved epoch {} for kafka partition {} across active segments in range {} - {}", new Object[]{Long.valueOf(fetchEpoch), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2)});
        }
        return new CassandraWindowFlushManager(this, this.client, this.partitioner, i, fetchEpoch, fetchStreamTime);
    }

    private void handleEpochFencing(int i, SegmentPartitioner.SegmentPartition segmentPartition, long j) {
        String format = String.format("Could not initialize commit buffer [%s-%d] - attempted to claim epoch %d, but was fenced by a writer that claimed epoch %d on table partition %s", name(), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(fetchEpoch(segmentPartition)), segmentPartition);
        TaskMigratedException taskMigratedException = new TaskMigratedException(format);
        LOG.warn(format, taskMigratedException);
        throw taskMigratedException;
    }

    public RemoteWriteResult<SegmentPartitioner.SegmentPartition> createSegment(int i, long j, SegmentPartitioner.SegmentPartition segmentPartition) {
        return (this.client.execute((Statement<?>) createSegment(segmentPartition, j)).wasApplied() || this.client.execute((Statement<?>) reserveEpoch(segmentPartition, j)).wasApplied()) ? RemoteWriteResult.success(segmentPartition) : RemoteWriteResult.failure(segmentPartition);
    }

    private BoundStatement createSegment(SegmentPartitioner.SegmentPartition segmentPartition, long j) {
        return this.createSegment.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setLong(ColumnName.EPOCH.bind(), j).setIdempotent(true);
    }

    public RemoteWriteResult<SegmentPartitioner.SegmentPartition> deleteSegment(int i, SegmentPartitioner.SegmentPartition segmentPartition) {
        return !this.client.execute((Statement<?>) expireSegment(segmentPartition)).wasApplied() ? RemoteWriteResult.failure(segmentPartition) : RemoteWriteResult.success(segmentPartition);
    }

    private BoundStatement expireSegment(SegmentPartitioner.SegmentPartition segmentPartition) {
        return this.expireSegment.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setIdempotent(true);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        SegmentPartitioner.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(i);
        List all = this.client.execute((Statement<?>) this.fetchOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), metadataTablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), metadataTablePartition.segmentId)).all();
        if (all.size() != 1) {
            throw new IllegalStateException(String.format("Expected exactly one offset row for %s[%s] but got %d", this.name, Integer.valueOf(i), Integer.valueOf(all.size())));
        }
        return ((Row) all.get(0)).getLong(ColumnName.OFFSET.column());
    }

    public BoundStatement setOffset(int i, long j) {
        LOG.debug("{}[{}] Updating offset to {}", new Object[]{this.name, Integer.valueOf(i), Long.valueOf(j)});
        SegmentPartitioner.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(i);
        return this.setOffset.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), metadataTablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), metadataTablePartition.segmentId).setLong(ColumnName.OFFSET.bind(), j);
    }

    public long fetchStreamTime(int i) {
        SegmentPartitioner.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(i);
        List all = this.client.execute((Statement<?>) this.fetchStreamTime.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), metadataTablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), metadataTablePartition.segmentId)).all();
        if (all.size() != 1) {
            throw new IllegalStateException(String.format("Expected exactly one stream-time row for %s[%s] but got %d", this.name, Integer.valueOf(i), Integer.valueOf(all.size())));
        }
        return ((Row) all.get(0)).getLong(ColumnName.STREAM_TIME.column());
    }

    public BoundStatement setStreamTime(int i, long j, long j2) {
        LOG.debug("{}[{}] Updating stream time to {} with epoch {}", new Object[]{this.name, Integer.valueOf(i), Long.valueOf(j2), Long.valueOf(j)});
        SegmentPartitioner.SegmentPartition metadataTablePartition = this.partitioner.metadataTablePartition(i);
        return this.setStreamTime.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), metadataTablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), metadataTablePartition.segmentId).setLong(ColumnName.STREAM_TIME.bind(), j2);
    }

    public long fetchEpoch(SegmentPartitioner.SegmentPartition segmentPartition) {
        List all = this.client.execute((Statement<?>) this.fetchEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId)).all();
        if (all.size() != 1) {
            throw new IllegalStateException(String.format("Expected exactly one epoch metadata row for %s[%s] but got %d", this.name, segmentPartition, Integer.valueOf(all.size())));
        }
        return ((Row) all.get(0)).getLong(ColumnName.EPOCH.column());
    }

    public BoundStatement reserveEpoch(SegmentPartitioner.SegmentPartition segmentPartition, long j) {
        return this.reserveEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setLong(ColumnName.EPOCH.bind(), j);
    }

    public BoundStatement ensureEpoch(SegmentPartitioner.SegmentPartition segmentPartition, long j) {
        return this.ensureEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setLong(ColumnName.EPOCH.bind(), j);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    @CheckReturnValue
    public BoundStatement insert(int i, WindowedKey windowedKey, byte[] bArr, long j) {
        SegmentPartitioner.SegmentPartition tablePartition = this.partitioner.tablePartition(i, windowedKey);
        return this.insert.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), tablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), tablePartition.segmentId).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(windowedKey.key.get())).setInstant(ColumnName.WINDOW_START.bind(), Instant.ofEpochMilli(windowedKey.windowStartMs)).setByteBuffer(ColumnName.DATA_VALUE.bind(), ByteBuffer.wrap(bArr));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    @CheckReturnValue
    public BoundStatement delete(int i, WindowedKey windowedKey) {
        SegmentPartitioner.SegmentPartition tablePartition = this.partitioner.tablePartition(i, windowedKey);
        return this.delete.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), tablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), tablePartition.segmentId).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(windowedKey.key.get())).setInstant(ColumnName.WINDOW_START.bind(), Instant.ofEpochMilli(windowedKey.windowStartMs));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public byte[] fetch(int i, Bytes bytes, long j) {
        SegmentPartitioner.SegmentPartition tablePartition = this.partitioner.tablePartition(i, new WindowedKey(bytes, j));
        List all = this.client.execute((Statement<?>) this.fetchSingle.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), tablePartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), tablePartition.segmentId).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(ColumnName.WINDOW_START.bind(), Instant.ofEpochMilli(j))).all();
        if (all.size() > 1) {
            throw new IllegalStateException("Unexpected multiple results for point lookup");
        }
        if (all.isEmpty()) {
            return null;
        }
        return ((ByteBuffer) Objects.requireNonNull(((Row) all.get(0)).getByteBuffer(ColumnName.DATA_VALUE.column()))).array();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetch(int i, Bytes bytes, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.range(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.fetch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(WINDOW_FROM_BIND, Instant.ofEpochMilli(j)).setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(j2))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.wrapped(linkedList);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetch(int i, Bytes bytes, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.reverseRange(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.backFetch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setByteBuffer(ColumnName.DATA_KEY.bind(), ByteBuffer.wrap(bytes.get())).setInstant(WINDOW_FROM_BIND, Instant.ofEpochMilli(j)).setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(j2))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.wrapped(linkedList);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetchRange(int i, Bytes bytes, Bytes bytes2, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.range(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.fetchRange.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setByteBuffer(KEY_FROM_BIND, ByteBuffer.wrap(bytes.get())).setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(bytes2.get()))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.filterKv(Iterators.wrapped(linkedList), windowedKey -> {
            return windowedKey.windowStartMs >= j && windowedKey.windowStartMs < j2;
        });
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetchRange(int i, Bytes bytes, Bytes bytes2, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.reverseRange(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.backFetchRange.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setByteBuffer(KEY_FROM_BIND, ByteBuffer.wrap(bytes.get())).setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(bytes2.get()))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.filterKv(Iterators.wrapped(linkedList), windowedKey -> {
            return windowedKey.windowStartMs >= j && windowedKey.windowStartMs < j2;
        });
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetchAll(int i, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.range(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.fetchAll.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setInstant(KEY_FROM_BIND, Instant.ofEpochMilli(j)).setInstant(KEY_TO_BIND, Instant.ofEpochMilli(j2))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.filterKv(Iterators.wrapped(linkedList), windowedKey -> {
            return windowedKey.windowStartMs >= j && windowedKey.windowStartMs < j2;
        });
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetchAll(int i, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        for (SegmentPartitioner.SegmentPartition segmentPartition : this.partitioner.reverseRange(i, j, j2)) {
            linkedList.add(Iterators.kv(this.client.execute((Statement<?>) this.backFetchAll.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), segmentPartition.tablePartition).setLong(ColumnName.SEGMENT_ID.bind(), segmentPartition.segmentId).setInstant(KEY_FROM_BIND, Instant.ofEpochMilli(j)).setInstant(KEY_TO_BIND, Instant.ofEpochMilli(j2))).iterator(), CassandraWindowedTable::windowRows));
        }
        return Iterators.filterKv(Iterators.wrapped(linkedList), windowedKey -> {
            return windowedKey.windowStartMs >= j && windowedKey.windowStartMs < j2;
        });
    }

    private static KeyValue<WindowedKey, byte[]> windowRows(Row row) {
        return new KeyValue<>(new WindowedKey(Bytes.wrap(row.getByteBuffer(ColumnName.DATA_KEY.column()).array()), row.getInstant(ColumnName.WINDOW_START.column()).toEpochMilli()), row.getByteBuffer(ColumnName.DATA_VALUE.column()).array());
    }
}
