package org.apache.rocketmq.mqtt.cs.session.loop;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.class */
public class QueueCache {
    private static Logger logger = LoggerFactory.getLogger(QueueCache.class);

    @Resource
    private ConnectConf connectConf;

    @Resource
    private LmqQueueStore lmqQueueStore;
    private ScheduledThreadPoolExecutor loadCacheService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("loadCache_"));
    private AtomicLong rid = new AtomicLong();
    private Map<Queue, QueueEvent> loadEvent = new ConcurrentHashMap();
    private Map<Queue, Boolean> loadStatus = new ConcurrentHashMap();
    private Cache<Queue, CacheEntry> cache = Caffeine.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/QueueCache$CacheEntry.class */
    public class CacheEntry {
        private AtomicBoolean loading = new AtomicBoolean(false);
        private List<Message> messageList = new ArrayList();
        private volatile long startLoadingT = System.currentTimeMillis();

        CacheEntry() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startLoad() {
            if (this.loading.compareAndSet(false, true)) {
                this.startLoadingT = System.currentTimeMillis();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void endLoad() {
            this.loading.compareAndSet(true, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/mqtt/cs/session/loop/QueueCache$QueueEvent.class */
    public class QueueEvent {
        long id;
        Session session;

        public QueueEvent(long j, Session session) {
            this.id = j;
            this.session = session;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((QueueEvent) obj).id;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.id));
        }
    }

    @PostConstruct
    public void init() {
        this.loadCacheService.scheduleWithFixedDelay(() -> {
            for (Map.Entry<Queue, QueueEvent> entry : this.loadEvent.entrySet()) {
                Queue key = entry.getKey();
                QueueEvent value = entry.getValue();
                if (!Boolean.TRUE.equals(this.loadStatus.get(key))) {
                    CacheEntry cacheEntry = (CacheEntry) this.cache.getIfPresent(key);
                    if (cacheEntry == null) {
                        cacheEntry = new CacheEntry();
                        this.cache.put(key, cacheEntry);
                    }
                    if (CollectionUtils.isEmpty(cacheEntry.messageList)) {
                        loadCache(true, key.toFirstTopic(), key, null, 1, value);
                    } else {
                        QueueOffset queueOffset = new QueueOffset();
                        queueOffset.setOffset(((Message) cacheEntry.messageList.get(cacheEntry.messageList.size() - 1)).getOffset() + 1);
                        loadCache(false, key.toFirstTopic(), key, queueOffset, this.connectConf.getPullBatchSize(), value);
                    }
                }
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
    }

    public void refreshCache(Pair<Queue, Session> pair) {
        Queue queue = (Queue) pair.getLeft();
        if (queue == null || queue.isP2p() || queue.isRetry()) {
            return;
        }
        addLoadEvent(queue, (Session) pair.getRight());
    }

    private void addLoadEvent(Queue queue, Session session) {
        this.loadEvent.put(queue, new QueueEvent(this.rid.incrementAndGet(), session));
        if (((CacheEntry) this.cache.getIfPresent(queue)) == null) {
            this.cache.put(queue, new CacheEntry());
        }
    }

    private void callbackResult(CompletableFuture<PullResult> completableFuture, CompletableFuture<PullResult> completableFuture2) {
        completableFuture.whenComplete((pullResult, th) -> {
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else {
                completableFuture2.complete(pullResult);
            }
        });
    }

    private String toFirstTopic(Subscription subscription) {
        String firstTopic = subscription.toFirstTopic();
        if (subscription.isRetry()) {
            firstTopic = this.lmqQueueStore.getClientRetryTopic();
        }
        if (subscription.isP2p()) {
            firstTopic = StringUtils.isNotBlank(this.lmqQueueStore.getClientP2pTopic()) ? this.lmqQueueStore.getClientP2pTopic() : this.lmqQueueStore.getClientRetryTopic();
        }
        return firstTopic;
    }

    public PullResultStatus pullMessage(Session session, Subscription subscription, Queue queue, QueueOffset queueOffset, int i, CompletableFuture<PullResult> completableFuture) {
        if (subscription.isP2p() || subscription.isRetry()) {
            StatUtil.addPv("NotPullCache", 1L);
            collectorPullCacheStatus("NotPullCache");
            callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
            return PullResultStatus.DONE;
        }
        CacheEntry cacheEntry = (CacheEntry) this.cache.getIfPresent(queue);
        if (cacheEntry == null) {
            StatUtil.addPv("NoPullCache", 1L);
            collectorPullCacheStatus("NotPullCache");
            callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
            return PullResultStatus.DONE;
        }
        if (cacheEntry.loading.get()) {
            if (System.currentTimeMillis() - cacheEntry.startLoadingT <= 1000) {
                return PullResultStatus.LATER;
            }
            StatUtil.addPv("LoadPullCacheTimeout", 1L);
            collectorPullCacheStatus("LoadPullCacheTimeout");
            callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
            return PullResultStatus.DONE;
        }
        List<Message> list = cacheEntry.messageList;
        if (list.isEmpty()) {
            if (this.loadEvent.get(queue) != null) {
                collectorPullCacheStatus("EmptyPullCacheLATER");
                StatUtil.addPv("EmptyPullCacheLATER", 1L);
                return PullResultStatus.LATER;
            }
            StatUtil.addPv("EmptyPullCache", 1L);
            collectorPullCacheStatus("EmptyPullCache");
            callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
            return PullResultStatus.DONE;
        }
        if (queueOffset.getOffset() < ((Message) list.get(0)).getOffset()) {
            StatUtil.addPv("OutPullCache", 1L);
            collectorPullCacheStatus("OutPullCache");
            callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
            return PullResultStatus.DONE;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (cacheEntry) {
            for (Message message : list) {
                if (message.getOffset() >= queueOffset.getOffset()) {
                    arrayList.add(message);
                }
                if (arrayList.size() >= i) {
                    break;
                }
            }
        }
        if (!arrayList.isEmpty()) {
            PullResult pullResult = new PullResult();
            pullResult.setMessageList(arrayList);
            completableFuture.complete(pullResult);
            StatUtil.addPv("PullFromCache", 1L);
            collectorPullCacheStatus("PullFromCache");
            return this.loadEvent.get(queue) != null ? PullResultStatus.LATER : PullResultStatus.DONE;
        }
        if (this.loadEvent.get(queue) != null) {
            StatUtil.addPv("PullCacheLATER", 1L);
            collectorPullCacheStatus("PullCacheLATER");
            return PullResultStatus.LATER;
        }
        StatUtil.addPv("OutPullCache2", 1L);
        collectorPullCacheStatus("OutPullCache2");
        callbackResult(this.lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, i), completableFuture);
        return PullResultStatus.DONE;
    }

    private void collectorPullCacheStatus(String str) {
        try {
            MqttMetricsCollector.collectPullCacheStatusTps(1L, new String[]{str});
        } catch (Throwable th) {
            logger.error("", th);
        }
    }

    private void loadCache(boolean z, String str, Queue queue, QueueOffset queueOffset, int i, QueueEvent queueEvent) {
        this.loadStatus.put(queue, true);
        CacheEntry cacheEntry = (CacheEntry) this.cache.getIfPresent(queue);
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry();
            this.cache.put(queue, cacheEntry);
        }
        cacheEntry.startLoad();
        CacheEntry cacheEntry2 = cacheEntry;
        (z ? this.lmqQueueStore.pullLastMessages(str, queue, i) : this.lmqQueueStore.pullMessage(str, queue, queueOffset, i)).whenComplete((pullResult, th) -> {
            try {
                if (th != null) {
                    logger.error("", th);
                    this.loadEvent.remove(queue, queueEvent);
                    this.loadStatus.remove(queue);
                    cacheEntry2.endLoad();
                    addLoadEvent(queue, queueEvent.session);
                    return;
                }
                if (pullResult != null) {
                    try {
                        if (!CollectionUtils.isEmpty(pullResult.getMessageList())) {
                            synchronized (cacheEntry2) {
                                cacheEntry2.messageList.addAll(pullResult.getMessageList());
                                if (z) {
                                    Collections.sort(cacheEntry2.messageList, Comparator.comparingLong((v0) -> {
                                        return v0.getOffset();
                                    }));
                                }
                                int size = cacheEntry2.messageList.size() - this.connectConf.getQueueCacheSize();
                                for (int i2 = 0; i2 < size; i2++) {
                                    cacheEntry2.messageList.remove(0);
                                }
                            }
                            if (pullResult.getMessageList().size() >= i && !z) {
                                addLoadEvent(queue, queueEvent.session);
                                this.loadEvent.remove(queue, queueEvent);
                                this.loadStatus.remove(queue);
                                cacheEntry2.endLoad();
                                return;
                            }
                        }
                    } catch (Exception e) {
                        logger.error("loadCache failed {}", queue.getQueueName(), e);
                        addLoadEvent(queue, queueEvent.session);
                        this.loadEvent.remove(queue, queueEvent);
                        this.loadStatus.remove(queue);
                        cacheEntry2.endLoad();
                        return;
                    }
                }
                this.loadEvent.remove(queue, queueEvent);
                this.loadStatus.remove(queue);
                cacheEntry2.endLoad();
            } catch (Throwable th) {
                this.loadEvent.remove(queue, queueEvent);
                this.loadStatus.remove(queue);
                cacheEntry2.endLoad();
                throw th;
            }
        });
    }
}
