package org.apache.hw_v4_0_0.hedwig.server.persistence;

import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.MessageIdUtils;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.common.UnexpectedError;
import org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache.class */
public class ReadAheadCache implements PersistenceManager, Runnable {
    static Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
    protected PersistenceManagerWithRangeScan realPersistenceManager;
    protected ServerConfiguration cfg;
    protected Map<CacheKey, CacheValue> cache = new HashMap();
    protected BlockingQueue<CacheRequest> requestQueue = new LinkedBlockingQueue();
    protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap();
    protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new HashMap();
    protected long presentCacheSize = 0;
    protected PersistCallback persistCallbackInstance = new PersistCallback();
    protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new NoSuchSeqIdException();
    protected ReadAheadException readAheadExceptionInstance = new ReadAheadException();
    protected boolean keepRunning = true;
    protected Thread cacheThread = new Thread(this, "CacheThread");

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$CacheRequest.class */
    public interface CacheRequest {
        void performRequest();
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$DeliveredUntil.class */
    protected class DeliveredUntil implements CacheRequest {
        ByteString topic;
        Long seqId;

        public DeliveredUntil(ByteString byteString, Long l) {
            this.topic = byteString;
            this.seqId = l;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            SortedSet<Long> sortedSet = ReadAheadCache.this.orderedIndexOnSeqId.get(this.topic);
            if (sortedSet == null) {
                return;
            }
            Iterator<Long> it = sortedSet.headSet(Long.valueOf(this.seqId.longValue() + 1)).iterator();
            while (it.hasNext()) {
                CacheKey cacheKey = new CacheKey(this.topic, it.next().longValue());
                if (ReadAheadCache.logger.isDebugEnabled()) {
                    ReadAheadCache.logger.debug("Removing seq-id: " + cacheKey.getSeqId() + " topic: " + cacheKey.getTopic().toStringUtf8() + " from cache because every subscriber has moved past");
                }
                ReadAheadCache.this.removeMessageFromCache(cacheKey, ReadAheadCache.this.readAheadExceptionInstance, true, false);
                it.remove();
            }
            if (sortedSet.isEmpty()) {
                ReadAheadCache.this.orderedIndexOnSeqId.remove(this.topic);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ExceptionOnCacheKey.class */
    public class ExceptionOnCacheKey implements CacheRequest {
        CacheKey cacheKey;
        Exception exception;

        public ExceptionOnCacheKey(CacheKey cacheKey, Exception exc) {
            this.cacheKey = cacheKey;
            this.exception = exc;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            ReadAheadCache.this.removeMessageFromCache(this.cacheKey, this.exception, true, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$HashSetCacheKeyFactory.class */
    public static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
        protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();

        protected HashSetCacheKeyFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.Factory
        public Set<CacheKey> newInstance() {
            return new HashSet();
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$NoSuchSeqIdException.class */
    protected static class NoSuchSeqIdException extends Exception {
        public NoSuchSeqIdException() {
            super("No such seq-id");
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$PersistCallback.class */
    public class PersistCallback implements Callback<Long> {
        public PersistCallback() {
        }

        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
        public void operationFailed(Object obj, PubSubException pubSubException) {
            PersistRequest persistRequest = (PersistRequest) obj;
            persistRequest.getCallback().operationFailed(persistRequest.getCtx(), pubSubException);
        }

        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
        public void operationFinished(Object obj, Long l) {
            PersistRequest persistRequest = (PersistRequest) obj;
            persistRequest.getCallback().operationFinished(persistRequest.getCtx(), l);
            PubSubProtocol.Message mergeLocalSeqId = MessageIdUtils.mergeLocalSeqId(persistRequest.getMessage(), l.longValue());
            ReadAheadCache.this.enqueueWithoutFailure(new ScanResponse(new CacheKey(persistRequest.getTopic(), l.longValue()), mergeLocalSeqId));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ReadAheadException.class */
    public static class ReadAheadException extends Exception {
        public ReadAheadException() {
            super("Readahead failed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ReadAheadScanCallback.class */
    public class ReadAheadScanCallback implements ScanCallback {
        Queue<CacheKey> installedStubs;
        ByteString topic;

        public ReadAheadScanCallback(Queue<CacheKey> queue, ByteString byteString) {
            this.installedStubs = queue;
            this.topic = byteString;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void messageScanned(Object obj, PubSubProtocol.Message message) {
            CacheKey cacheKey = new CacheKey(this.topic, message.getMsgId().getLocalComponent());
            ReadAheadCache.this.enqueueWithoutFailure(new ScanResponse(cacheKey, message));
            CacheKey peek = this.installedStubs.peek();
            if (peek == null) {
                return;
            }
            if (peek.equals(cacheKey)) {
                this.installedStubs.poll();
            } else {
                ReadAheadCache.logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: " + this.topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + peek.seqId + " topic: " + peek.topic.toStringUtf8() + " installedStubs: " + this.installedStubs);
                enqueueDeleteOfRemainingStubs(ReadAheadCache.this.noSuchSeqIdExceptionInstance);
            }
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void scanFailed(Object obj, Exception exc) {
            enqueueDeleteOfRemainingStubs(exc);
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void scanFinished(Object obj, ScanCallback.ReasonForFinish reasonForFinish) {
            if (reasonForFinish != ScanCallback.ReasonForFinish.NO_MORE_MESSAGES) {
                enqueueDeleteOfRemainingStubs(ReadAheadCache.this.readAheadExceptionInstance);
            }
        }

        private void enqueueDeleteOfRemainingStubs(Exception exc) {
            while (true) {
                CacheKey poll = this.installedStubs.poll();
                if (poll == null) {
                    return;
                } else {
                    ReadAheadCache.this.enqueueWithoutFailure(new ExceptionOnCacheKey(poll, exc));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ScanRequestWrapper.class */
    protected class ScanRequestWrapper implements CacheRequest {
        ScanRequest request;

        public ScanRequestWrapper(ScanRequest scanRequest) {
            this.request = scanRequest;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            RangeScanRequest doReadAhead = ReadAheadCache.this.doReadAhead(this.request);
            ReadAheadCache.this.cache.get(new CacheKey(this.request.getTopic(), this.request.getStartSeqId())).addCallback(this.request.getCallback(), this.request.getCtx());
            if (doReadAhead != null) {
                ReadAheadCache.this.realPersistenceManager.scanMessages(doReadAhead);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ScanResponse.class */
    public class ScanResponse implements CacheRequest {
        CacheKey cacheKey;
        PubSubProtocol.Message message;

        public ScanResponse(CacheKey cacheKey, PubSubProtocol.Message message) {
            this.cacheKey = cacheKey;
            this.message = message;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            ReadAheadCache.this.addMessageToCache(this.cacheKey, this.message, System.currentTimeMillis());
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$ShutdownCacheRequest.class */
    protected class ShutdownCacheRequest implements CacheRequest {
        protected ShutdownCacheRequest() {
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            ReadAheadCache.this.keepRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/persistence/ReadAheadCache$TreeSetLongFactory.class */
    public static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
        protected static TreeSetLongFactory instance = new TreeSetLongFactory();

        protected TreeSetLongFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.Factory
        public SortedSet<Long> newInstance() {
            return new TreeSet();
        }
    }

    public ReadAheadCache(PersistenceManagerWithRangeScan persistenceManagerWithRangeScan, ServerConfiguration serverConfiguration) {
        this.realPersistenceManager = persistenceManagerWithRangeScan;
        this.cfg = serverConfiguration;
    }

    public ReadAheadCache start() {
        this.cacheThread.start();
        return this;
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public long getSeqIdAfterSkipping(ByteString byteString, long j, int i) {
        return this.realPersistenceManager.getSeqIdAfterSkipping(byteString, j, i);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public PubSubProtocol.MessageSeqId getCurrentSeqIdForTopic(ByteString byteString) throws PubSubException.ServerNotResponsibleForTopicException {
        return this.realPersistenceManager.getCurrentSeqIdForTopic(byteString);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public void persistMessage(PersistRequest persistRequest) {
        this.realPersistenceManager.persistMessage(new PersistRequest(persistRequest.getTopic(), persistRequest.getMessage(), this.persistCallbackInstance, persistRequest));
    }

    protected void enqueueWithoutFailure(CacheRequest cacheRequest) {
        if (!this.requestQueue.offer(cacheRequest)) {
            throw new UnexpectedError("Could not enqueue object: " + cacheRequest.toString() + " to cache request queue. Exiting.");
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public void scanSingleMessage(ScanRequest scanRequest) {
        enqueueWithoutFailure(new ScanRequestWrapper(scanRequest));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public void deliveredUntil(ByteString byteString, Long l) {
        enqueueWithoutFailure(new DeliveredUntil(byteString, l));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager
    public void consumedUntil(ByteString byteString, Long l) {
        this.realPersistenceManager.consumedUntil(byteString, l);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.keepRunning) {
            try {
                this.requestQueue.take().performRequest();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public void stop() {
        enqueueWithoutFailure(new ShutdownCacheRequest());
    }

    protected RangeScanRequest doReadAhead(ScanRequest scanRequest) {
        ByteString topic = scanRequest.getTopic();
        Long valueOf = Long.valueOf(scanRequest.getStartSeqId());
        int max = Math.max(1, this.cfg.getReadAheadCount());
        RangeScanRequest doReadAheadStartingFrom = doReadAheadStartingFrom(topic, valueOf.longValue(), max);
        return doReadAheadStartingFrom != null ? doReadAheadStartingFrom : doReadAheadStartingFrom(topic, Long.valueOf(this.realPersistenceManager.getSeqIdAfterSkipping(topic, valueOf.longValue(), max / 2)).longValue(), max / 2);
    }

    protected RangeScanRequest doReadAheadStartingFrom(ByteString byteString, long j, int i) {
        LinkedList linkedList = new LinkedList();
        int i2 = 0;
        while (i2 < i) {
            CacheKey cacheKey = new CacheKey(byteString, j);
            if (this.cache.containsKey(cacheKey)) {
                break;
            }
            this.cache.put(cacheKey, new CacheValue());
            if (logger.isDebugEnabled()) {
                logger.debug("Adding stub for seq-id: " + j + " topic: " + byteString.toStringUtf8());
            }
            linkedList.add(cacheKey);
            j = this.realPersistenceManager.getSeqIdAfterSkipping(byteString, j, 1);
            i2++;
        }
        if (i2 == 0) {
            return null;
        }
        return new RangeScanRequest(byteString, j, i2, this.cfg.getReadAheadSizeBytes(), new ReadAheadScanCallback(linkedList, byteString), null);
    }

    protected void addMessageToCache(CacheKey cacheKey, PubSubProtocol.Message message, long j) {
        if (logger.isDebugEnabled()) {
            logger.debug("Adding msg (topic: " + cacheKey.getTopic().toStringUtf8() + ", seq-id: " + message.getMsgId().getLocalComponent() + ") to readahead cache");
        }
        CacheValue cacheValue = this.cache.get(cacheKey);
        CacheValue cacheValue2 = cacheValue;
        if (cacheValue == null) {
            cacheValue2 = new CacheValue();
            this.cache.put(cacheKey, cacheValue2);
        }
        this.presentCacheSize += message.getBody().size();
        MapMethods.addToMultiMap(this.timeIndexOfAddition, Long.valueOf(j), cacheKey, HashSetCacheKeyFactory.instance);
        MapMethods.addToMultiMap(this.orderedIndexOnSeqId, cacheKey.getTopic(), Long.valueOf(cacheKey.getSeqId()), TreeSetLongFactory.instance);
        cacheValue2.setMessageAndInvokeCallbacks(message, j);
        collectOldCacheEntries();
    }

    protected void removeMessageFromCache(CacheKey cacheKey, Exception exc, boolean z, boolean z2) {
        CacheValue remove = this.cache.remove(cacheKey);
        if (remove == null) {
            return;
        }
        if (remove.isStub()) {
            remove.setErrorAndInvokeCallbacks(exc);
            return;
        }
        this.presentCacheSize -= remove.getMessage().getBody().size();
        if (z2) {
            MapMethods.removeFromMultiMap(this.orderedIndexOnSeqId, cacheKey.getTopic(), Long.valueOf(cacheKey.getSeqId()));
        }
        if (z) {
            MapMethods.removeFromMultiMap(this.timeIndexOfAddition, Long.valueOf(remove.getTimeOfAddition()), cacheKey);
        }
    }

    protected void collectOldCacheEntries() {
        long maximumCacheSize = this.cfg.getMaximumCacheSize();
        while (this.presentCacheSize > maximumCacheSize && !this.timeIndexOfAddition.isEmpty()) {
            Long firstKey = this.timeIndexOfAddition.firstKey();
            for (CacheKey cacheKey : this.timeIndexOfAddition.get(firstKey)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Removing topic: " + cacheKey.getTopic() + "seq-id: " + cacheKey.getSeqId() + " from cache because its the oldest");
                }
                removeMessageFromCache(cacheKey, this.readAheadExceptionInstance, false, true);
            }
            this.timeIndexOfAddition.remove(firstKey);
        }
    }
}
