package dev.responsive.kafka.store;

import dev.responsive.db.CassandraClient;
import dev.responsive.db.KeySpec;
import dev.responsive.db.MetadataRow;
import dev.responsive.db.RemoteSchema;
import dev.responsive.db.RemoteWriter;
import dev.responsive.db.WriterFactory;
import dev.responsive.db.partitioning.SubPartitioner;
import dev.responsive.kafka.clients.SharedClients;
import dev.responsive.kafka.config.ResponsiveConfig;
import dev.responsive.model.Result;
import dev.responsive.utils.Iterators;
import dev.responsive.utils.TableName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/responsive/kafka/store/CommitBuffer.class */
public class CommitBuffer<K, S extends RemoteSchema<K>> implements RecordBatchingStateRestoreCallback {
    public static final int MAX_BATCH_SIZE = 1000;
    private final Logger log;
    private final SizeTrackingBuffer<K> buffer;
    private final CassandraClient client;
    private final String tableName;
    private final int partition;
    private final Admin admin;
    private final TopicPartition changelog;
    private final S remoteSchema;
    private final boolean truncateChangelog;
    private final FlushTriggers flushTriggers;
    private final int maxBatchSize;
    private final SubPartitioner subPartitioner;
    private final Supplier<Instant> clock;
    private final KeySpec<K> keySpec;
    private Instant lastFlush;
    private WriterFactory<K> writerFactory;
    private boolean isNotDeleteEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, S extends RemoteSchema<K>> CommitBuffer<K, S> from(SharedClients sharedClients, TableName tableName, TopicPartition topicPartition, S s, KeySpec<K> keySpec, boolean z, SubPartitioner subPartitioner, ResponsiveConfig responsiveConfig) {
        return new CommitBuffer<>(sharedClients.cassandraClient, tableName.cassandraName(), topicPartition, sharedClients.admin, s, keySpec, z, FlushTriggers.fromConfig(responsiveConfig), subPartitioner);
    }

    CommitBuffer(CassandraClient cassandraClient, String str, TopicPartition topicPartition, Admin admin, S s, KeySpec<K> keySpec, boolean z, FlushTriggers flushTriggers, SubPartitioner subPartitioner) {
        this(cassandraClient, str, topicPartition, admin, s, keySpec, z, flushTriggers, MAX_BATCH_SIZE, subPartitioner, Instant::now);
    }

    CommitBuffer(CassandraClient cassandraClient, String str, TopicPartition topicPartition, Admin admin, S s, KeySpec<K> keySpec, boolean z, FlushTriggers flushTriggers, int i, SubPartitioner subPartitioner, Supplier<Instant> supplier) {
        this.isNotDeleteEnabled = false;
        this.client = cassandraClient;
        this.tableName = str;
        this.changelog = topicPartition;
        this.partition = topicPartition.partition();
        this.admin = admin;
        this.remoteSchema = s;
        this.buffer = new SizeTrackingBuffer<>(keySpec);
        this.keySpec = keySpec;
        this.truncateChangelog = z;
        this.flushTriggers = flushTriggers;
        this.maxBatchSize = i;
        this.subPartitioner = subPartitioner;
        this.clock = supplier;
        this.log = new LogContext(String.format("commit-buffer-%s[%d] ", str, Integer.valueOf(this.partition))).logger(CommitBuffer.class);
    }

    public void init() {
        this.lastFlush = this.clock.get();
        this.writerFactory = this.remoteSchema.init(this.tableName, this.subPartitioner, this.partition);
        int first = this.subPartitioner.first(this.partition);
        this.log.info("Initialized store with {} for all subpartitions {} -> {}", new Object[]{this.writerFactory, Integer.valueOf(first), Integer.valueOf((first + this.subPartitioner.getFactor()) - 1)});
    }

    public void put(K k, byte[] bArr, long j) {
        this.buffer.put(k, Result.value(k, bArr, j));
    }

    public void tombstone(K k, long j) {
        this.buffer.put(k, Result.tombstone(k, j));
    }

    public Result<K> get(K k) {
        Result<K> result = (Result) this.buffer.getReader().get(k);
        if (result == null || !this.keySpec.retain(result.key)) {
            return null;
        }
        return result;
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2, Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backRange(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().subMap(k2, k).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all() {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backAll(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long offset() {
        return this.remoteSchema.metadata(this.tableName, this.subPartitioner.first(this.partition)).offset;
    }

    int size() {
        return this.buffer.getReader().size();
    }

    private long totalBytesBuffered() {
        return this.buffer.getBytes();
    }

    private boolean shouldFlush() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        if (this.buffer.getReader().size() >= this.flushTriggers.getRecords()) {
            this.log.info("will flush due to records buffered {} over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
            z = true;
        } else {
            this.log.debug("records buffered {} not over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
        }
        long j = totalBytesBuffered();
        if (j >= this.flushTriggers.getBytes()) {
            this.log.info("will flush due to bytes buffered {} over bytes trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
            z2 = true;
        } else {
            this.log.debug("bytes buffered {} not over trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
        }
        Instant instant = this.clock.get();
        if (this.lastFlush.plus((TemporalAmount) this.flushTriggers.getInterval()).isBefore(instant)) {
            this.log.info("will flush due to time since last flush {} over interval trigger {}", Duration.between(this.lastFlush, instant), instant);
            z3 = true;
        } else {
            this.log.debug("time since last flush {} not over trigger {}", Duration.between(this.lastFlush, instant), instant);
        }
        return z || z2 || z3;
    }

    public void flush(long j) {
        if (shouldFlush()) {
            if (this.buffer.getReader().isEmpty()) {
                this.log.info("Ignoring flush() to empty commit buffer");
                return;
            }
            RemoteWriteResult flush = flush(j, this.maxBatchSize);
            if (!flush.wasApplied()) {
                throwFencedException(flush, j);
            }
            this.lastFlush = this.clock.get();
        }
    }

    private RemoteWriteResult flush(long j, int i) {
        long nanoTime = System.nanoTime();
        this.log.info("Flushing {} records to remote (offset={}, writer={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Long.valueOf(j), this.writerFactory});
        HashMap hashMap = new HashMap();
        for (Result<K> result : this.buffer.getReader().values()) {
            int partition = this.subPartitioner.partition(this.partition, this.keySpec.bytes(result.key));
            RemoteWriter remoteWriter = (RemoteWriter) hashMap.computeIfAbsent(Integer.valueOf(partition), num -> {
                return this.writerFactory.createWriter(this.client, this.tableName, partition, i);
            });
            if (result.isTombstone) {
                remoteWriter.delete(result.key);
            } else if (this.keySpec.retain(result.key)) {
                remoteWriter.insert(result.key, result.value, result.timestamp);
            }
        }
        RemoteWriteResult drain = drain(hashMap.values());
        if (!drain.wasApplied()) {
            return drain;
        }
        RemoteWriteResult offset = ((RemoteWriter) hashMap.computeIfAbsent(Integer.valueOf(this.subPartitioner.first(this.partition)), num2 -> {
            return this.writerFactory.createWriter(this.client, this.tableName, num2.intValue(), i);
        })).setOffset(j);
        if (!offset.wasApplied()) {
            return offset;
        }
        this.log.info("Flushed {} records to remote in {}ms (offset={}, writer={}, numPartitions={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), Long.valueOf(j), this.writerFactory, Integer.valueOf(hashMap.size())});
        this.buffer.clear();
        maybeTruncateChangelog(j);
        return RemoteWriteResult.success(this.partition);
    }

    private RemoteWriteResult drain(Collection<RemoteWriter<K>> collection) {
        CompletionStage completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(this.partition));
        Iterator<RemoteWriter<K>> it = collection.iterator();
        while (it.hasNext()) {
            completedStage = completedStage.thenCombine(it.next().flush(), (remoteWriteResult, remoteWriteResult2) -> {
                return !remoteWriteResult.wasApplied() ? remoteWriteResult : remoteWriteResult2;
            });
        }
        try {
            return (RemoteWriteResult) completedStage.toCompletableFuture().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed while flushing partition " + this.partition + " to remote", e);
        }
    }

    private void maybeTruncateChangelog(long j) {
        if (!this.truncateChangelog || this.isNotDeleteEnabled) {
            return;
        }
        try {
            this.admin.deleteRecords(Map.of(this.changelog, RecordsToDelete.beforeOffset(j))).all().get();
            this.log.info("Truncated changelog topic {} before offset {}", this.changelog, Long.valueOf(j));
        } catch (InterruptedException e) {
            this.log.error("Interrupted while truncating the changelog " + this.changelog, e);
            throw new ProcessorStateException("Interrupted while truncating " + this.changelog, e);
        } catch (ExecutionException e2) {
            this.log.warn("Could not truncate changelog topic-partition " + this.changelog, e2);
            if (e2.getCause() instanceof PolicyViolationException) {
                this.log.info("Disabling further changelog truncation attempts due to topic configuration being incompatible with deleteRecord requests", e2);
                this.isNotDeleteEnabled = true;
            }
        }
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        ArrayList arrayList = new ArrayList(this.maxBatchSize);
        Iterator<ConsumerRecord<byte[], byte[]>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() >= this.maxBatchSize) {
                restoreCassandraBatch(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.size() > 0) {
            restoreCassandraBatch(arrayList);
        }
    }

    private void restoreCassandraBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        long j = -1;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            j = consumerRecord.offset();
            if (consumerRecord.value() == null) {
                tombstone(this.keySpec.keyFromRecord(consumerRecord), consumerRecord.timestamp());
            } else {
                put(this.keySpec.keyFromRecord(consumerRecord), (byte[]) consumerRecord.value(), consumerRecord.timestamp());
            }
        }
        if (j >= 0) {
            RemoteWriteResult flush = flush(j, collection.size());
            if (flush.wasApplied()) {
                return;
            }
            throwFencedException(flush, j);
        }
    }

    private void throwFencedException(RemoteWriteResult remoteWriteResult, long j) {
        MetadataRow metadata = this.remoteSchema.metadata(this.tableName, remoteWriteResult.getPartition());
        String format = String.format("%s[%d:%d] Fenced while writing batch! Local Epoch: %s, Persisted Epoch: %d, Batch Offset: %d, Persisted Offset: %d", this.tableName, Integer.valueOf(this.partition), Integer.valueOf(remoteWriteResult.getPartition()), this.writerFactory, Long.valueOf(metadata.epoch), Long.valueOf(j), Long.valueOf(metadata.offset));
        this.log.error(format);
        throw new TaskMigratedException(format);
    }
}
