package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-0.10.2.0.jar:org/apache/kafka/streams/state/internals/CachingSessionStore.class */
public class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
    private final SessionStore<Bytes, byte[]> bytesStore;
    private final SessionKeySchema keySchema;
    private Serde<K> keySerde;
    private final Serde<AGG> aggSerde;
    private InternalProcessorContext context;
    private String cacheName;
    private StateSerdes<K, AGG> serdes;
    private ThreadCache cache;
    private CacheFlushListener<Windowed<K>, AGG> flushListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-0.10.2.0.jar:org/apache/kafka/streams/state/internals/CachingSessionStore$FilteredCacheIterator.class */
    public static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
        private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
        private final HasNextCondition hasNextCondition;

        FilteredCacheIterator(ThreadCache.MemoryLRUCacheBytesIterator memoryLRUCacheBytesIterator, HasNextCondition hasNextCondition) {
            this.cacheIterator = memoryLRUCacheBytesIterator;
            this.hasNextCondition = hasNextCondition;
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Bytes peekNextKey() {
            if (hasNext()) {
                return this.cacheIterator.peekNextKey();
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.hasNextCondition.hasNext(this.cacheIterator);
        }

        @Override // java.util.Iterator
        public KeyValue<Bytes, LRUCacheEntry> next() {
            if (hasNext()) {
                return this.cacheIterator.next();
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.kafka.streams.state.internals.PeekingKeyValueIterator
        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
            if (hasNext()) {
                return this.cacheIterator.peekNext();
            }
            throw new NoSuchElementException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingSessionStore(SessionStore<Bytes, byte[]> sessionStore, Serde<K> serde, Serde<AGG> serde2) {
        super(sessionStore);
        this.bytesStore = sessionStore;
        this.keySerde = serde;
        this.aggSerde = serde2;
        this.keySchema = new SessionKeySchema();
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<K>, AGG> findSessions(K k, long j, long j2) {
        validateStoreOpen();
        Bytes wrap = Bytes.wrap(this.keySerde.serializer().serialize(name(), k));
        ThreadCache.MemoryLRUCacheBytesIterator range = this.cache.range(this.cacheName, this.keySchema.lowerRange(wrap, j).get(), this.keySchema.upperRange(wrap, j2).get());
        return new MergedSortedCacheSessionStoreIterator(new FilteredCacheIterator(range, this.keySchema.hasNextCondition(wrap, j, j2)), this.bytesStore.findSessions(wrap, j, j2), this.serdes);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<K> windowed) {
        validateStoreOpen();
        put(windowed, null);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<K> windowed, AGG agg) {
        validateStoreOpen();
        Bytes binary = SessionKeySerde.toBinary(windowed, this.keySerde.serializer());
        this.cache.put(this.cacheName, binary.get(), new LRUCacheEntry(this.serdes.rawValue(agg), true, this.context.offset(), windowed.window().end(), this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, AGG> fetch(K k) {
        return findSessions(k, 0L, Long.MAX_VALUE);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.bytesStore.init(processorContext, stateStore);
        initInternal((InternalProcessorContext) processorContext);
    }

    private void initInternal(final InternalProcessorContext internalProcessorContext) {
        this.context = internalProcessorContext;
        this.serdes = new StateSerdes<>(this.bytesStore.name(), this.keySerde == null ? internalProcessorContext.keySerde() : this.keySerde, this.aggSerde == null ? internalProcessorContext.valueSerde() : this.aggSerde);
        this.cacheName = internalProcessorContext.taskId() + "-" + this.bytesStore.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.cacheName, new ThreadCache.DirtyEntryFlushListener() { // from class: org.apache.kafka.streams.state.internals.CachingSessionStore.1
            @Override // org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener
            public void apply(List<ThreadCache.DirtyEntry> list) {
                Iterator<ThreadCache.DirtyEntry> it = list.iterator();
                while (it.hasNext()) {
                    CachingSessionStore.this.putAndMaybeForward(it.next(), internalProcessorContext);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext internalProcessorContext) {
        Bytes key = dirtyEntry.key();
        RecordContext recordContext = internalProcessorContext.recordContext();
        internalProcessorContext.setRecordContext(dirtyEntry.recordContext());
        try {
            Windowed<K> from = SessionKeySerde.from(key.get(), this.keySerde.deserializer());
            if (this.flushListener != null) {
                AGG valueFrom = this.serdes.valueFrom(dirtyEntry.newValue());
                AGG fetchPrevious = fetchPrevious(key);
                if (valueFrom != null || fetchPrevious != null) {
                    this.flushListener.apply(from, valueFrom == null ? null : valueFrom, fetchPrevious);
                }
            }
            this.bytesStore.put(new Windowed<>(Bytes.wrap(this.serdes.rawKey(from.key())), from.window()), dirtyEntry.newValue());
            internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private AGG fetchPrevious(Bytes bytes) {
        KeyValueIterator<Windowed<Bytes>, byte[]> fetch = this.bytesStore.fetch(bytes);
        Throwable th = null;
        try {
            try {
                if (!fetch.hasNext()) {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    return null;
                }
                AGG valueFrom = this.serdes.valueFrom((byte[]) fetch.next().value);
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return valueFrom;
            } finally {
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.cache.flush(this.cacheName);
        this.bytesStore.flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        flush();
        this.bytesStore.close();
        this.cache.close(this.cacheName);
    }

    @Override // org.apache.kafka.streams.state.internals.CachedStateStore
    public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> cacheFlushListener) {
        this.flushListener = cacheFlushListener;
    }
}
