package dev.responsive.kafka.internal.stores;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/ResponsiveWindowStore.class */
public class ResponsiveWindowStore implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
    private final Logger log;
    private final ResponsiveWindowParams params;
    private final TableName name;
    private final long retentionPeriod;
    private long initialStreamTime;
    private int numBloomFilterWindows;
    private double fpp;
    private long expectedKeysPerWindow;
    private BloomFilter<byte[]> bloomFilter;
    private boolean open;
    private WindowOperations windowOperations;
    private StateStoreContext context;
    private long observedStreamTime = -1;
    private Position position = Position.emptyPosition();

    public ResponsiveWindowStore(ResponsiveWindowParams responsiveWindowParams) {
        this.params = responsiveWindowParams;
        this.name = responsiveWindowParams.name();
        this.retentionPeriod = responsiveWindowParams.retentionPeriod();
        this.log = new LogContext(String.format("window-store [%s] ", this.name.kafkaName())).logger(ResponsiveWindowStore.class);
    }

    public String name() {
        return this.name.kafkaName();
    }

    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveWindowStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            this.log.info("Initializing state store");
            Map appConfigs = stateStoreContext.appConfigs();
            ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(appConfigs);
            this.context = stateStoreContext;
            if (ProcessorContextUtils.asInternalProcessorContext(stateStoreContext).taskType() == Task.TaskType.STANDBY) {
                this.log.error("Unexpected standby task created");
                throw new IllegalStateException("Store " + name() + " was opened as a standby");
            }
            this.windowOperations = SegmentedOperations.create(this.name, stateStoreContext, this.params, appConfigs, responsiveConfig, windowedKey -> {
                return windowedKey.windowStartMs >= minValidTimestamp();
            });
            this.numBloomFilterWindows = this.params.retainDuplicates() ? 0 : responsiveConfig.getInt(ResponsiveConfig.WINDOW_BLOOM_FILTER_COUNT_CONFIG).intValue();
            this.expectedKeysPerWindow = responsiveConfig.getLong(ResponsiveConfig.WINDOW_BLOOM_FILTER_EXPECTED_KEYS_CONFIG).longValue();
            this.fpp = responsiveConfig.getDouble(ResponsiveConfig.WINDOW_BLOOM_FILTER_FPP_CONFIG).doubleValue();
            this.initialStreamTime = this.windowOperations.initialStreamTime();
            this.log.info("Completed initializing state store");
            this.open = true;
            stateStoreContext.register(stateStore, this::restoreBatch);
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    private boolean inLatestWindowBloomFilter(long j) {
        return hasActiveBloomFilter() && j == this.observedStreamTime;
    }

    private boolean hasActiveBloomFilter() {
        return this.bloomFilter != null;
    }

    public boolean persistent() {
        return false;
    }

    public boolean isOpen() {
        return this.open;
    }

    public void put(Bytes bytes, byte[] bArr, long j) {
        if (bArr == null && this.params.retainDuplicates()) {
            return;
        }
        if (bArr == null) {
            this.windowOperations.delete(bytes, j);
        } else {
            this.windowOperations.put(bytes, bArr, j);
            if (this.numBloomFilterWindows > 0) {
                boolean z = j > this.observedStreamTime && j != this.initialStreamTime;
                if (z) {
                    createNewBloomFilter(j);
                }
                if (z || inLatestWindowBloomFilter(j)) {
                    this.bloomFilter.put(bytes.get());
                }
            }
        }
        this.observedStreamTime = Math.max(this.observedStreamTime, j);
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    private void createNewBloomFilter(long j) {
        if (hasActiveBloomFilter()) {
            double expectedFpp = this.bloomFilter.expectedFpp();
            this.log.info("Rolling new bloom filter for window@{}, previous filter for window@{} had approx {} elements with estimated fpp={}", new Object[]{Long.valueOf(j), Long.valueOf(this.observedStreamTime), Long.valueOf(this.bloomFilter.approximateElementCount()), Double.valueOf(expectedFpp)});
            if (expectedFpp > this.fpp) {
                this.log.warn("Actual fpp was {} which is greater than requested fpp {}. It's likely that the actual number of elements exceeded the expected keys per window {}", new Object[]{Double.valueOf(expectedFpp), Double.valueOf(this.fpp), Long.valueOf(this.expectedKeysPerWindow)});
            }
        } else {
            this.log.info("Creating the first bloom filter for window@{} with previous window@{}", Long.valueOf(j), Long.valueOf(this.observedStreamTime));
        }
        this.bloomFilter = BloomFilter.create(Funnels.byteArrayFunnel(), this.expectedKeysPerWindow, this.fpp);
    }

    public byte[] fetch(Bytes bytes, long j) {
        if (j < minValidTimestamp()) {
            return null;
        }
        if (!inLatestWindowBloomFilter(j) || this.bloomFilter.mightContain(bytes.get())) {
            return this.windowOperations.fetch(bytes, j);
        }
        return null;
    }

    public WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.windowed(Iterators.emptyKv());
        }
        return this.windowOperations.fetch(bytes, Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.emptyKv();
        }
        return this.windowOperations.fetch(bytes, bytes2, Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.emptyKv();
        }
        return this.windowOperations.fetchAll(Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        return this.windowOperations.all();
    }

    public WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.windowed(Iterators.emptyKv());
        }
        return this.windowOperations.backwardFetch(bytes, Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.emptyKv();
        }
        return this.windowOperations.backwardFetch(bytes, bytes2, Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
        long minValidTimestamp = minValidTimestamp();
        if (j2 < minValidTimestamp) {
            return Iterators.emptyKv();
        }
        return this.windowOperations.backwardFetchAll(Math.max(minValidTimestamp, j), j2);
    }

    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        return this.windowOperations.backwardAll();
    }

    public void flush() {
    }

    public void close() {
        this.windowOperations.close();
    }

    public Position getPosition() {
        return this.position;
    }

    private long minValidTimestamp() {
        return (this.observedStreamTime - this.retentionPeriod) + 1;
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        this.observedStreamTime = Math.max(this.observedStreamTime, this.windowOperations.restoreBatch(collection, this.observedStreamTime));
    }
}
