package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/state/internals/InMemorySessionStore.class */
public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InMemorySessionStore.class);
    private final String name;
    private final String metricScope;
    private Sensor expiredRecordSensor;
    private final long retentionPeriod;
    private long observedStreamTime = -1;
    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> endTimeMap = new ConcurrentSkipListMap();
    private final Set<InMemorySessionStoreIterator> openIterators = ConcurrentHashMap.newKeySet();
    private volatile boolean open = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/state/internals/InMemorySessionStore$ClosingCallback.class */
    public interface ClosingCallback {
        void deregisterIterator(InMemorySessionStoreIterator inMemorySessionStoreIterator);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/state/internals/InMemorySessionStore$InMemorySessionStoreIterator.class */
    public static class InMemorySessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> {
        private final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator;
        private Iterator<Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>>> keyIterator;
        private Iterator<Map.Entry<Long, byte[]>> recordIterator;
        private KeyValue<Windowed<Bytes>, byte[]> next;
        private Bytes currentKey;
        private long currentEndTime;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long latestSessionStartTime;
        private final ClosingCallback callback;

        InMemorySessionStoreIterator(Bytes bytes, Bytes bytes2, long j, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> it, ClosingCallback closingCallback) {
            this.keyFrom = bytes;
            this.keyTo = bytes2;
            this.latestSessionStartTime = j;
            this.endTimeIterator = it;
            this.callback = closingCallback;
            setAllIterators();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.recordIterator == null) {
                return false;
            }
            this.next = getNext();
            return this.next != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Windowed<Bytes> peekNextKey() {
            if (hasNext()) {
                return this.next.key;
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public KeyValue<Windowed<Bytes>, byte[]> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Windowed<Bytes>, byte[]> keyValue = this.next;
            this.next = null;
            return keyValue;
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.next = null;
            this.recordIterator = null;
            this.callback.deregisterIterator(this);
        }

        Long minTime() {
            return Long.valueOf(this.currentEndTime);
        }

        private KeyValue<Windowed<Bytes>, byte[]> getNext() {
            if (!this.recordIterator.hasNext()) {
                getNextIterators();
            }
            if (this.recordIterator == null) {
                return null;
            }
            Map.Entry<Long, byte[]> next = this.recordIterator.next();
            return new KeyValue<>(new Windowed(this.currentKey, new SessionWindow(next.getKey().longValue(), this.currentEndTime)), next.getValue());
        }

        private void setAllIterators() {
            while (this.endTimeIterator.hasNext()) {
                Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> next = this.endTimeIterator.next();
                this.currentEndTime = next.getKey().longValue();
                this.keyIterator = next.getValue().subMap((boolean) this.keyFrom, true, (boolean) this.keyTo, true).entrySet().iterator();
                if (setInnerIterators()) {
                    return;
                }
            }
            this.recordIterator = null;
        }

        private boolean setInnerIterators() {
            while (this.keyIterator.hasNext()) {
                Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>> next = this.keyIterator.next();
                this.currentKey = next.getKey();
                if (this.latestSessionStartTime == Long.MAX_VALUE) {
                    this.recordIterator = next.getValue().entrySet().iterator();
                } else {
                    this.recordIterator = next.getValue().headMap((ConcurrentNavigableMap<Long, byte[]>) Long.valueOf(this.latestSessionStartTime), true).entrySet().iterator();
                }
                if (this.recordIterator.hasNext()) {
                    return true;
                }
            }
            return false;
        }

        private void getNextIterators() {
            if (setInnerIterators()) {
                return;
            }
            setAllIterators();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InMemorySessionStore(String str, long j, String str2) {
        this.name = str;
        this.retentionPeriod = j;
        this.metricScope = str2;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        StreamsMetricsImpl metrics = ((InternalProcessorContext) processorContext).metrics();
        String taskId = processorContext.taskId().toString();
        this.expiredRecordSensor = metrics.storeLevelSensor(taskId, name(), StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO, new Sensor[0]);
        StreamsMetricsImpl.addInvocationRateAndCount(this.expiredRecordSensor, "stream-" + this.metricScope + "-metrics", metrics.tagMap("task-id", taskId, this.metricScope + "-id", name()), StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP);
        if (stateStore != null) {
            processorContext.register(stateStore, (bArr, bArr2) -> {
                put(SessionKeySchema.from(Bytes.wrap(bArr)), bArr2);
            });
        }
        this.open = true;
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<Bytes> windowed, byte[] bArr) {
        removeExpiredSegments();
        long end = windowed.window().end();
        this.observedStreamTime = Math.max(this.observedStreamTime, end);
        if (end <= this.observedStreamTime - this.retentionPeriod) {
            this.expiredRecordSensor.record();
            LOG.debug("Skipping record for expired segment.");
        } else {
            if (bArr == null) {
                remove(windowed);
                return;
            }
            this.endTimeMap.computeIfAbsent(Long.valueOf(end), l -> {
                return new ConcurrentSkipListMap();
            });
            ConcurrentNavigableMap concurrentNavigableMap = (ConcurrentNavigableMap) this.endTimeMap.get(Long.valueOf(end));
            concurrentNavigableMap.computeIfAbsent(windowed.key(), bytes -> {
                return new ConcurrentSkipListMap();
            });
            ((ConcurrentNavigableMap) concurrentNavigableMap.get(windowed.key())).put(Long.valueOf(windowed.window().start()), bArr);
        }
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<Bytes> windowed) {
        ConcurrentNavigableMap concurrentNavigableMap = (ConcurrentNavigableMap) this.endTimeMap.get(Long.valueOf(windowed.window().end()));
        ConcurrentNavigableMap concurrentNavigableMap2 = (ConcurrentNavigableMap) concurrentNavigableMap.get(windowed.key());
        concurrentNavigableMap2.remove(Long.valueOf(windowed.window().start()));
        if (concurrentNavigableMap2.isEmpty()) {
            concurrentNavigableMap.remove(windowed.key());
            if (concurrentNavigableMap.isEmpty()) {
                this.endTimeMap.remove(Long.valueOf(windowed.window().end()));
            }
        }
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public byte[] fetchSession(Bytes bytes, long j, long j2) {
        ConcurrentNavigableMap concurrentNavigableMap;
        ConcurrentNavigableMap concurrentNavigableMap2;
        removeExpiredSegments();
        Objects.requireNonNull(bytes, "key cannot be null");
        if (j2 <= this.observedStreamTime - this.retentionPeriod || (concurrentNavigableMap = (ConcurrentNavigableMap) this.endTimeMap.get(Long.valueOf(j2))) == null || (concurrentNavigableMap2 = (ConcurrentNavigableMap) concurrentNavigableMap.get(bytes)) == null) {
            return null;
        }
        return (byte[]) concurrentNavigableMap2.get(Long.valueOf(j));
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, long j, long j2) {
        Objects.requireNonNull(bytes, "key cannot be null");
        removeExpiredSegments();
        return registerNewIterator(bytes, bytes, j2, this.endTimeMap.tailMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>) Long.valueOf(j), true).entrySet().iterator());
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, Bytes bytes2, long j, long j2) {
        Objects.requireNonNull(bytes, "from key cannot be null");
        Objects.requireNonNull(bytes2, "to key cannot be null");
        removeExpiredSegments();
        if (bytes.compareTo(bytes2) <= 0) {
            return registerNewIterator(bytes, bytes2, j2, this.endTimeMap.tailMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>) Long.valueOf(j), true).entrySet().iterator());
        }
        LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
        return KeyValueIterators.emptyIterator();
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        removeExpiredSegments();
        return registerNewIterator(bytes, bytes, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator());
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2) {
        Objects.requireNonNull(bytes, "from key cannot be null");
        Objects.requireNonNull(bytes2, "to key cannot be null");
        removeExpiredSegments();
        return registerNewIterator(bytes, bytes2, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator());
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return false;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        if (this.openIterators.size() != 0) {
            LOG.warn("Closing {} open iterators for store {}", Integer.valueOf(this.openIterators.size()), this.name);
            Iterator<InMemorySessionStoreIterator> it = this.openIterators.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
        this.endTimeMap.clear();
        this.openIterators.clear();
        this.open = false;
    }

    private void removeExpiredSegments() {
        long max = Math.max(0L, (this.observedStreamTime - this.retentionPeriod) + 1);
        Iterator<InMemorySessionStoreIterator> it = this.openIterators.iterator();
        while (it.hasNext()) {
            max = Math.min(max, it.next().minTime().longValue());
        }
        this.endTimeMap.headMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>) Long.valueOf(max), false).clear();
    }

    private InMemorySessionStoreIterator registerNewIterator(Bytes bytes, Bytes bytes2, long j, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> it) {
        InMemorySessionStoreIterator inMemorySessionStoreIterator = new InMemorySessionStoreIterator(bytes, bytes2, j, it, inMemorySessionStoreIterator2 -> {
            this.openIterators.remove(inMemorySessionStoreIterator2);
        });
        this.openIterators.add(inMemorySessionStoreIterator);
        return inMemorySessionStoreIterator;
    }
}
