package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.KeySpec;
import dev.responsive.kafka.internal.db.MetadataRow;
import dev.responsive.kafka.internal.db.RemoteTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
import dev.responsive.kafka.internal.db.WriterFactory;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.utils.ExceptionSupplier;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SharedClients;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/responsive/kafka/internal/stores/CommitBuffer.class */
public class CommitBuffer<K, S extends RemoteTable<K>> implements RecordBatchingStateRestoreCallback, Closeable {
    public static final int MAX_BATCH_SIZE = 1000;
    private final Logger log;
    private final String logPrefix;
    private final SizeTrackingBuffer<K> buffer;
    private final CassandraClient client;
    private final Admin admin;
    private final TopicPartition changelog;
    private final S table;
    private final FlushTriggers flushTriggers;
    private final ExceptionSupplier exceptionSupplier;
    private final int maxBatchSize;
    private final SubPartitioner subPartitioner;
    private final Supplier<Instant> clock;
    private final KeySpec<K> keySpec;
    private boolean isDeleteEnabled;
    private final boolean truncateChangelog;
    private KafkaFuture<DeletedRecords> deleteRecordsFuture;
    private Instant lastFlush;
    private WriterFactory<K> writerFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, S extends RemoteTable<K>> CommitBuffer<K, S> from(SharedClients sharedClients, TopicPartition topicPartition, S s, KeySpec<K> keySpec, boolean z, SubPartitioner subPartitioner, ResponsiveConfig responsiveConfig) {
        return new CommitBuffer<>(sharedClients.cassandraClient, topicPartition, sharedClients.admin, s, keySpec, z, FlushTriggers.fromConfig(responsiveConfig), ExceptionSupplier.fromConfig(responsiveConfig.originals()), subPartitioner);
    }

    CommitBuffer(CassandraClient cassandraClient, TopicPartition topicPartition, Admin admin, S s, KeySpec<K> keySpec, boolean z, FlushTriggers flushTriggers, ExceptionSupplier exceptionSupplier, SubPartitioner subPartitioner) {
        this(cassandraClient, topicPartition, admin, s, keySpec, z, flushTriggers, exceptionSupplier, MAX_BATCH_SIZE, subPartitioner, Instant::now);
    }

    CommitBuffer(CassandraClient cassandraClient, TopicPartition topicPartition, Admin admin, S s, KeySpec<K> keySpec, boolean z, FlushTriggers flushTriggers, ExceptionSupplier exceptionSupplier, int i, SubPartitioner subPartitioner, Supplier<Instant> supplier) {
        this.isDeleteEnabled = true;
        this.deleteRecordsFuture = KafkaFuture.completedFuture((Object) null);
        this.client = cassandraClient;
        this.changelog = topicPartition;
        this.admin = admin;
        this.table = s;
        this.buffer = new SizeTrackingBuffer<>(keySpec);
        this.keySpec = keySpec;
        this.flushTriggers = flushTriggers;
        this.exceptionSupplier = exceptionSupplier;
        this.maxBatchSize = i;
        this.subPartitioner = subPartitioner;
        this.clock = supplier;
        this.logPrefix = String.format("commit-buffer [%s-%d] ", s.name(), Integer.valueOf(topicPartition.partition()));
        this.log = new LogContext(this.logPrefix).logger(CommitBuffer.class);
        if (!hasSourceTopicChangelog(topicPartition.topic())) {
            this.truncateChangelog = z;
            return;
        }
        this.truncateChangelog = false;
        if (z) {
            this.log.warn("Changelog truncation is not compatible with the source-topic changelog optimization, and will not be enabled for the topic {}", topicPartition.topic());
        }
    }

    private static boolean hasSourceTopicChangelog(String str) {
        return !str.endsWith("changelog");
    }

    public void init() {
        this.lastFlush = this.clock.get();
        this.writerFactory = this.table.init(this.subPartitioner, this.changelog.partition());
        int first = this.subPartitioner.first(this.changelog.partition());
        this.log.info("Initialized store with {} for subpartitions in range: {{} -> {}}", new Object[]{this.writerFactory, Integer.valueOf(first), Integer.valueOf((first + this.subPartitioner.getFactor()) - 1)});
    }

    public void put(K k, byte[] bArr, long j) {
        this.buffer.put(k, Result.value(k, bArr, j));
    }

    public void tombstone(K k, long j) {
        this.buffer.put(k, Result.tombstone(k, j));
    }

    public Result<K> get(K k) {
        Result<K> result = (Result) this.buffer.getReader().get(k);
        if (result == null || !this.keySpec.retain(result.key)) {
            return null;
        }
        return result;
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2, Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backRange(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().subMap(k2, k).entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all() {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backAll(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().entrySet().iterator(), entry -> {
            return this.keySpec.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long offset() {
        return this.table.metadata(this.subPartitioner.first(this.changelog.partition())).offset;
    }

    private long totalBytesBuffered() {
        return this.buffer.getBytes();
    }

    private boolean triggerFlush() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        if (this.buffer.getReader().size() >= this.flushTriggers.getRecords()) {
            this.log.debug("Will flush due to records buffered {} over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
            z = true;
        } else {
            this.log.debug("Records buffered since last flush {} not over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
        }
        long j = totalBytesBuffered();
        if (j >= this.flushTriggers.getBytes()) {
            this.log.debug("Will flush due to bytes buffered {} over bytes trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
            z2 = true;
        } else {
            this.log.debug("Bytes buffered since last flush {} not over trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
        }
        Instant instant = this.clock.get();
        if (this.lastFlush.plus((TemporalAmount) this.flushTriggers.getInterval()).isBefore(instant)) {
            this.log.debug("Will flush as time since last flush {} over interval trigger {}", Duration.between(this.lastFlush, instant), instant);
            z3 = true;
        } else {
            this.log.debug("Time since last flush {} not over trigger {}", Duration.between(this.lastFlush, instant), instant);
        }
        return z || z2 || z3;
    }

    public void flush(long j) {
        if (triggerFlush()) {
            if (this.buffer.getReader().isEmpty()) {
                this.log.debug("Ignoring flush() of empty commit buffer");
            } else {
                doFlush(j, this.maxBatchSize);
                this.lastFlush = this.clock.get();
            }
        }
    }

    private void doFlush(long j, int i) {
        long nanoTime = System.nanoTime();
        this.log.info("Flushing {} records with batchSize={} to remote (offset={}, writer={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(i), Long.valueOf(j), this.writerFactory});
        HashMap hashMap = new HashMap();
        for (Result<K> result : this.buffer.getReader().values()) {
            int partition = this.subPartitioner.partition(this.changelog.partition(), this.keySpec.bytes(result.key));
            RemoteWriter remoteWriter = (RemoteWriter) hashMap.computeIfAbsent(Integer.valueOf(partition), num -> {
                return this.writerFactory.createWriter(this.client, partition, i);
            });
            if (result.isTombstone) {
                remoteWriter.delete(result.key);
            } else if (this.keySpec.retain(result.key)) {
                remoteWriter.insert(result.key, result.value, result.timestamp);
            }
        }
        RemoteWriteResult drain = drain(hashMap.values());
        if (!drain.wasApplied()) {
            throwFencedException(drain, j);
        }
        RemoteWriteResult offset = ((RemoteWriter) hashMap.computeIfAbsent(Integer.valueOf(this.subPartitioner.first(this.changelog.partition())), num2 -> {
            return this.writerFactory.createWriter(this.client, num2.intValue(), i);
        })).setOffset(j);
        if (!offset.wasApplied()) {
            throwFencedException(offset, j);
        }
        this.log.debug("Flushed {} records to remote in {}ms (offset={}, writer={}, numSubPartitions={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), Long.valueOf(j), this.writerFactory, Integer.valueOf(hashMap.size())});
        this.buffer.clear();
        maybeTruncateChangelog(j);
    }

    private RemoteWriteResult drain(Collection<RemoteWriter<K>> collection) {
        CompletionStage completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(this.changelog.partition()));
        Iterator<RemoteWriter<K>> it = collection.iterator();
        while (it.hasNext()) {
            completedStage = completedStage.thenCombine(it.next().flush(), (remoteWriteResult, remoteWriteResult2) -> {
                return !remoteWriteResult.wasApplied() ? remoteWriteResult : remoteWriteResult2;
            });
        }
        try {
            return (RemoteWriteResult) completedStage.toCompletableFuture().get();
        } catch (InterruptedException | ExecutionException e) {
            this.log.error("Unexpected exception while flushing to remote", e);
            throw new RuntimeException(this.logPrefix + "Failed while flushing to remote", e);
        }
    }

    private void maybeTruncateChangelog(long j) {
        if (this.truncateChangelog && this.isDeleteEnabled) {
            if (!this.deleteRecordsFuture.isDone()) {
                this.log.debug("Still waiting on previous changelog truncation attempt to complete");
            } else {
                this.log.debug("Issuing new delete request to truncate {} up to offset {}", this.changelog, Long.valueOf(j));
                this.deleteRecordsFuture = ((KafkaFuture) this.admin.deleteRecords(Map.of(this.changelog, RecordsToDelete.beforeOffset(j))).lowWatermarks().get(this.changelog)).whenComplete(this::onDeleteRecords);
            }
        }
    }

    private void onDeleteRecords(DeletedRecords deletedRecords, Throwable th) {
        if (th == null) {
            this.log.info("Truncated changelog {} up to offset {}", this.changelog, Long.valueOf(deletedRecords.lowWatermark()));
            return;
        }
        if (th instanceof PolicyViolationException) {
            this.log.warn("Disabling further changelog truncation attempts due to topic configuration being incompatible with deleteRecords requests", th);
            this.isDeleteEnabled = false;
        } else if (th instanceof CancellationException) {
            this.log.info("Delete records request for changelog {} was cancelled", this.changelog);
        } else {
            this.log.warn("Truncation of changelog " + this.changelog + " failed and will be retriedafter the next flush", th);
        }
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        ArrayList arrayList = new ArrayList(this.maxBatchSize);
        Iterator<ConsumerRecord<byte[], byte[]>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() >= this.maxBatchSize) {
                restoreCassandraBatch(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.size() > 0) {
            restoreCassandraBatch(arrayList);
        }
    }

    private void restoreCassandraBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        long j = -1;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            j = consumerRecord.offset();
            if (consumerRecord.value() == null) {
                tombstone(this.keySpec.keyFromRecord(consumerRecord), consumerRecord.timestamp());
            } else {
                put(this.keySpec.keyFromRecord(consumerRecord), (byte[]) consumerRecord.value(), consumerRecord.timestamp());
            }
        }
        if (j >= 0) {
            doFlush(j, collection.size());
        }
    }

    private void throwFencedException(RemoteWriteResult remoteWriteResult, long j) {
        MetadataRow metadata = this.table.metadata(remoteWriteResult.getPartition());
        String format = String.format("[%d] Fenced while writing batch! Local Epoch: %s, Persisted Epoch: %d, Batch Offset: %d, Persisted Offset: %d", Integer.valueOf(remoteWriteResult.getPartition()), this.writerFactory, Long.valueOf(metadata.epoch), Long.valueOf(j), Long.valueOf(metadata.offset));
        this.log.warn(format);
        throw this.exceptionSupplier.commitFencedException(this.logPrefix + format);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.deleteRecordsFuture.cancel(true);
    }
}
