package dev.responsive.kafka.store;

import dev.responsive.db.CassandraClient;
import dev.responsive.db.partitioning.SubPartitioner;
import dev.responsive.kafka.clients.SharedClients;
import dev.responsive.kafka.config.ResponsiveConfig;
import dev.responsive.model.Result;
import dev.responsive.utils.Iterators;
import dev.responsive.utils.StoreUtil;
import dev.responsive.utils.TableName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/responsive/kafka/store/CommitBuffer.class */
public class CommitBuffer<K> implements RecordBatchingStateRestoreCallback {
    public static final int MAX_BATCH_SIZE = 1000;
    private final Logger log;
    private final SizeTrackingBuffer<K> buffer;
    private final CassandraClient client;
    private final String tableName;
    private final int partition;
    private final Admin admin;
    private final TopicPartition changelog;
    private final BufferPlugin<K> plugin;
    private final boolean truncateChangelog;
    private final FlushTriggers flushTriggers;
    private final int maxBatchSize;
    private final SubPartitioner subPartitioner;
    private final Supplier<Instant> clock;
    private final int maxConcurrentWrites;
    private Instant lastFlush;
    private long epoch;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> CommitBuffer<K> from(SharedClients sharedClients, TableName tableName, TopicPartition topicPartition, BufferPlugin<K> bufferPlugin, ResponsiveConfig responsiveConfig) {
        Admin admin = sharedClients.admin;
        CassandraClient cassandraClient = sharedClients.cassandraClient;
        SubPartitioner subPartitioner = responsiveConfig.getSubPartitioner(cassandraClient, admin, tableName, topicPartition.topic());
        return new CommitBuffer<>(cassandraClient, tableName.cassandraName(), topicPartition, admin, bufferPlugin, StoreUtil.shouldTruncateChangelog(topicPartition.topic(), admin, responsiveConfig.originals()), FlushTriggers.fromConfig(responsiveConfig), subPartitioner, responsiveConfig.getInt(ResponsiveConfig.MAX_CONCURRENT_REMOTE_WRITES_CONFIG).intValue());
    }

    CommitBuffer(CassandraClient cassandraClient, String str, TopicPartition topicPartition, Admin admin, BufferPlugin<K> bufferPlugin, boolean z, FlushTriggers flushTriggers, SubPartitioner subPartitioner, int i) {
        this(cassandraClient, str, topicPartition, admin, bufferPlugin, z, flushTriggers, MAX_BATCH_SIZE, subPartitioner, Instant::now, i);
    }

    CommitBuffer(CassandraClient cassandraClient, String str, TopicPartition topicPartition, Admin admin, BufferPlugin<K> bufferPlugin, boolean z, FlushTriggers flushTriggers, int i, SubPartitioner subPartitioner, Supplier<Instant> supplier, int i2) {
        this.client = cassandraClient;
        this.tableName = str;
        this.changelog = topicPartition;
        this.partition = topicPartition.partition();
        this.admin = admin;
        this.plugin = bufferPlugin;
        this.buffer = new SizeTrackingBuffer<>(bufferPlugin);
        this.truncateChangelog = z;
        this.flushTriggers = flushTriggers;
        this.maxBatchSize = i;
        this.subPartitioner = subPartitioner;
        this.clock = supplier;
        this.maxConcurrentWrites = i2;
        this.log = new LogContext(String.format("commit-buffer-%s[%d] ", str, Integer.valueOf(this.partition))).logger(CommitBuffer.class);
    }

    public void init() {
        this.subPartitioner.all(this.partition).forEach(i -> {
            this.client.initializeMetadata(this.tableName, i);
        });
        this.lastFlush = this.clock.get();
        int first = this.subPartitioner.first(this.partition);
        this.epoch = this.client.metadata(this.tableName, first).epoch + 1;
        for (int i2 : this.subPartitioner.all(this.partition).toArray()) {
            if (!this.client.execute(this.client.reserveEpoch(this.tableName, i2, this.epoch)).wasApplied()) {
                String format = String.format("Could not initialize commit buffer - attempted to claim epoch %d, but was fenced by a writer that claimed epoch %d on partition %d", Long.valueOf(this.epoch), Long.valueOf(this.client.metadata(this.tableName, i2).epoch), Integer.valueOf(i2));
                TaskMigratedException taskMigratedException = new TaskMigratedException(format);
                this.log.warn(format, taskMigratedException);
                throw taskMigratedException;
            }
        }
        this.log.info("Initialized store with epoch {} for all subpartitions {} -> {}", new Object[]{Long.valueOf(this.epoch), Integer.valueOf(first), Integer.valueOf((first + this.subPartitioner.getFactor()) - 1)});
    }

    public void put(K k, byte[] bArr) {
        this.buffer.put(k, Result.value(k, bArr));
    }

    public void tombstone(K k) {
        this.buffer.put(k, Result.tombstone(k));
    }

    public Result<K> get(K k) {
        Result<K> result = (Result) this.buffer.getReader().get(k);
        if (result == null || !this.plugin.retain(result.key)) {
            return null;
        }
        return result;
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2, Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, k2).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backRange(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().subMap(k2, k).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all() {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backAll(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long offset() {
        return this.client.metadata(this.tableName, this.subPartitioner.first(this.partition)).offset;
    }

    int size() {
        return this.buffer.getReader().size();
    }

    private long totalBytesBuffered() {
        return this.buffer.getBytes();
    }

    private boolean shouldFlush() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        if (this.buffer.getReader().size() >= this.flushTriggers.getRecords()) {
            this.log.info("will flush due to records buffered {} over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
            z = true;
        } else {
            this.log.debug("records buffered {} not over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
        }
        long j = totalBytesBuffered();
        if (j >= this.flushTriggers.getBytes()) {
            this.log.info("will flush due to bytes buffered {} over bytes trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
            z2 = true;
        } else {
            this.log.debug("bytes buffered {} not over trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
        }
        Instant instant = this.clock.get();
        if (this.lastFlush.plus((TemporalAmount) this.flushTriggers.getInterval()).isBefore(instant)) {
            this.log.info("will flush due to time since last flush {} over interval trigger {}", Duration.between(this.lastFlush, instant), instant);
            z3 = true;
        } else {
            this.log.debug("time since last flush {} not over trigger {}", Duration.between(this.lastFlush, instant), instant);
        }
        return z || z2 || z3;
    }

    public void flush(long j) {
        if (shouldFlush()) {
            if (this.buffer.getReader().isEmpty()) {
                this.log.info("Ignoring flush() to empty commit buffer");
                return;
            }
            AtomicWriteResult flush = flush(j, this.maxBatchSize);
            if (!flush.wasApplied()) {
                throwFencedException(flush, j);
            }
            this.lastFlush = this.clock.get();
        }
    }

    private AtomicWriteResult flush(long j, int i) {
        this.log.info("Flushing {} records to remote (offset={}, epoch={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Long.valueOf(j), Long.valueOf(this.epoch)});
        HashMap hashMap = new HashMap();
        this.subPartitioner.all(this.partition).forEach(i2 -> {
            hashMap.put(Integer.valueOf(i2), new AtomicWriter(this.client, this.tableName, i2, this.plugin, this.epoch, i));
        });
        this.buffer.getReader().values().forEach(result -> {
            ((AtomicWriter) hashMap.get(Integer.valueOf(this.subPartitioner.partition(this.partition, this.plugin.bytes(result.key))))).add(result);
        });
        AtomicWriteResult drain = drain(hashMap.values(), (v0) -> {
            return v0.flush();
        });
        if (!drain.wasApplied()) {
            return drain;
        }
        AtomicWriteResult offset = ((AtomicWriter) hashMap.get(Integer.valueOf(this.subPartitioner.first(this.partition)))).setOffset(j);
        if (!offset.wasApplied()) {
            return offset;
        }
        this.log.info("Completed flushing {} records to remote (offset={}, epoch={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Long.valueOf(j), Long.valueOf(this.epoch)});
        this.buffer.clear();
        maybeTruncateChangelog(j);
        return AtomicWriteResult.success(this.partition);
    }

    private AtomicWriteResult drain(Collection<AtomicWriter<K>> collection, Function<AtomicWriter<K>, CompletionStage<AtomicWriteResult>> function) {
        return drain(collection, function, (v0) -> {
            return v0.partition();
        }, this.partition, this.maxConcurrentWrites);
    }

    static <I> AtomicWriteResult drain(Collection<I> collection, Function<I, CompletionStage<AtomicWriteResult>> function, Function<I, Integer> function2, int i, int i2) {
        HashMap hashMap = new HashMap();
        AtomicReference atomicReference = new AtomicReference(AtomicWriteResult.success(i));
        try {
            for (I i3 : collection) {
                if (hashMap.size() >= i2) {
                    CompletableFuture.anyOf((CompletableFuture[]) hashMap.values().toArray(i4 -> {
                        return new CompletableFuture[i4];
                    })).get();
                    hashMap.values().removeIf((v0) -> {
                        return v0.isDone();
                    });
                }
                int intValue = function2.apply(i3).intValue();
                hashMap.put(Integer.valueOf(intValue), function.apply(i3).thenApply(atomicWriteResult -> {
                    return setIfNotApplied(atomicReference, atomicWriteResult);
                }).toCompletableFuture());
            }
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).get();
            }
            return (AtomicWriteResult) atomicReference.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new ProcessorStateException("Failed while flushing partition " + i + " to remote", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AtomicWriteResult setIfNotApplied(AtomicReference<AtomicWriteResult> atomicReference, AtomicWriteResult atomicWriteResult) {
        if (!atomicWriteResult.wasApplied()) {
            atomicReference.set(atomicWriteResult);
        }
        return atomicWriteResult;
    }

    private void maybeTruncateChangelog(long j) {
        if (this.truncateChangelog) {
            try {
                this.admin.deleteRecords(Map.of(this.changelog, RecordsToDelete.beforeOffset(j))).all().get();
                this.log.info("Truncated changelog topic {} before offset {}", this.changelog, Long.valueOf(j));
            } catch (InterruptedException e) {
                throw new ProcessorStateException("Interrupted while truncating " + this.changelog, e);
            } catch (ExecutionException e2) {
                this.log.warn("Could not truncate changelog topic-partition {}.", this.changelog, e2);
            }
        }
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        ArrayList arrayList = new ArrayList(this.maxBatchSize);
        Iterator<ConsumerRecord<byte[], byte[]>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() >= this.maxBatchSize) {
                restoreCassandraBatch(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.size() > 0) {
            restoreCassandraBatch(arrayList);
        }
    }

    private void restoreCassandraBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        long j = -1;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            j = consumerRecord.offset();
            if (consumerRecord.value() == null) {
                tombstone(this.plugin.keyFromRecord(consumerRecord));
            } else {
                put(this.plugin.keyFromRecord(consumerRecord), (byte[]) consumerRecord.value());
            }
        }
        if (j >= 0) {
            AtomicWriteResult flush = flush(j, collection.size());
            if (flush.wasApplied()) {
                return;
            }
            throwFencedException(flush, j);
        }
    }

    private void throwFencedException(AtomicWriteResult atomicWriteResult, long j) {
        CassandraClient.MetadataRow metadata = this.client.metadata(this.tableName, atomicWriteResult.getPartition());
        String format = String.format("%s[%d:%d] Fenced while writing batch! Local Epoch: %d, Persisted Epoch: %d, Batch Offset: %d, Persisted Offset: %d", this.tableName, Integer.valueOf(this.partition), Integer.valueOf(atomicWriteResult.getPartition()), Long.valueOf(this.epoch), Long.valueOf(metadata.epoch), Long.valueOf(j), Long.valueOf(metadata.offset));
        this.log.error(format);
        throw new TaskMigratedException(format);
    }
}
