package org.apache.rocketmq.tieredstore.core;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.GetMessageResultExt;
import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
import org.apache.rocketmq.tieredstore.index.IndexItem;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.class */
public class MessageStoreFetcherImpl implements MessageStoreFetcher {
    private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
    protected static final String CACHE_KEY_FORMAT = "%s@%d@%d";
    private final String brokerName;
    private final MetadataStore metadataStore;
    private final MessageStoreConfig storeConfig;
    private final TieredMessageStore messageStore;
    private final IndexService indexService;
    private final FlatFileStore flatFileStore;
    private final long memoryMaxSize;
    private final Cache<String, SelectBufferResult> fetcherCache;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode = new int[TieredStoreErrorCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode[TieredStoreErrorCode.ILLEGAL_PARAM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode[TieredStoreErrorCode.ILLEGAL_OFFSET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public MessageStoreFetcherImpl(TieredMessageStore tieredMessageStore) {
        this(tieredMessageStore, tieredMessageStore.getStoreConfig(), tieredMessageStore.getFlatFileStore(), tieredMessageStore.getIndexService());
    }

    public MessageStoreFetcherImpl(TieredMessageStore tieredMessageStore, MessageStoreConfig messageStoreConfig, FlatFileStore flatFileStore, IndexService indexService) {
        this.storeConfig = messageStoreConfig;
        this.brokerName = messageStoreConfig.getBrokerName();
        this.flatFileStore = flatFileStore;
        this.messageStore = tieredMessageStore;
        this.indexService = indexService;
        this.metadataStore = flatFileStore.getMetadataStore();
        this.memoryMaxSize = (long) (Runtime.getRuntime().maxMemory() * messageStoreConfig.getReadAheadCacheSizeThresholdRate());
        this.fetcherCache = initCache(messageStoreConfig);
        log.info("MessageStoreFetcher init success, brokerName={}", messageStoreConfig.getBrokerName());
    }

    private Cache<String, SelectBufferResult> initCache(MessageStoreConfig messageStoreConfig) {
        return Caffeine.newBuilder().scheduler(Scheduler.systemScheduler()).expireAfterAccess(messageStoreConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS).maximumWeight(this.memoryMaxSize).weigher((str, selectBufferResult) -> {
            return selectBufferResult.getSize();
        }).recordStats().build();
    }

    public Cache<String, SelectBufferResult> getFetcherCache() {
        return this.fetcherCache;
    }

    protected void putMessageToCache(FlatMessageFile flatMessageFile, long j, SelectBufferResult selectBufferResult) {
        MessageQueue messageQueue = flatMessageFile.getMessageQueue();
        this.fetcherCache.put(String.format(CACHE_KEY_FORMAT, messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j)), selectBufferResult);
    }

    protected SelectBufferResult getMessageFromCache(FlatMessageFile flatMessageFile, long j) {
        MessageQueue messageQueue = flatMessageFile.getMessageQueue();
        SelectBufferResult selectBufferResult = (SelectBufferResult) this.fetcherCache.getIfPresent(String.format(CACHE_KEY_FORMAT, messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j)));
        if (selectBufferResult == null) {
            return null;
        }
        long incrementAndGet = selectBufferResult.getAccessCount().incrementAndGet();
        if (incrementAndGet % 1000 == 0) {
            log.warn("MessageFetcher fetch same offset message too many times, topic={}, queueId={}, offset={}, count={}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Long.valueOf(incrementAndGet)});
        }
        return new SelectBufferResult(selectBufferResult.getByteBuffer().asReadOnlyBuffer(), selectBufferResult.getStartOffset(), selectBufferResult.getSize(), selectBufferResult.getTagCode());
    }

    protected GetMessageResultExt getMessageFromCache(FlatMessageFile flatMessageFile, long j, int i, MessageFilter messageFilter) {
        GetMessageResultExt getMessageResultExt = new GetMessageResultExt();
        long j2 = j;
        long readAheadMessageCountThreshold = j + this.storeConfig.getReadAheadMessageCountThreshold();
        while (true) {
            if (j2 >= readAheadMessageCountThreshold) {
                break;
            }
            SelectBufferResult messageFromCache = getMessageFromCache(flatMessageFile, j2);
            if (messageFromCache == null) {
                getMessageResultExt.setNextBeginOffset(j2);
                break;
            }
            getMessageResultExt.setNextBeginOffset(j2 + 1);
            if (messageFilter == null || (messageFilter.isMatchedByConsumeQueue(Long.valueOf(messageFromCache.getTagCode()), (ConsumeQueueExt.CqExtUnit) null) && messageFilter.isMatchedByCommitLog(messageFromCache.getByteBuffer().slice(), (Map) null))) {
                getMessageResultExt.addMessageExt(new SelectMappedBufferResult(messageFromCache.getStartOffset(), messageFromCache.getByteBuffer(), messageFromCache.getSize(), (MappedFile) null), j2, messageFromCache.getTagCode());
                if (getMessageResultExt.getMessageCount() == i) {
                    break;
                }
                if (getMessageResultExt.getBufferTotalSize() >= this.messageStore.getMessageStoreConfig().getMaxTransferBytesOnMessageInMemory()) {
                    break;
                }
            }
            j2++;
        }
        getMessageResultExt.setStatus(getMessageResultExt.getMessageCount() > 0 ? GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE);
        getMessageResultExt.setMinOffset(flatMessageFile.getConsumeQueueMinOffset());
        getMessageResultExt.setMaxOffset(flatMessageFile.getConsumeQueueCommitOffset());
        return getMessageResultExt;
    }

    protected CompletableFuture<Long> fetchMessageThenPutToCache(FlatMessageFile flatMessageFile, long j, int i) {
        MessageQueue messageQueue = flatMessageFile.getMessageQueue();
        return getMessageFromTieredStoreAsync(flatMessageFile, j, i).thenApply(getMessageResultExt -> {
            if (getMessageResultExt.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || getMessageResultExt.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
                return -1L;
            }
            if (getMessageResultExt.getStatus() != GetMessageStatus.FOUND) {
                log.warn("MessageFetcher prefetch message then put to cache failed, result={}, topic={}, queue={}, queue offset={}, batch size={}", new Object[]{getMessageResultExt.getStatus(), messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i)});
                return -1L;
            }
            List messageQueueOffset = getMessageResultExt.getMessageQueueOffset();
            List<Long> tagCodeList = getMessageResultExt.getTagCodeList();
            List messageMapedList = getMessageResultExt.getMessageMapedList();
            for (int i2 = 0; i2 < messageQueueOffset.size(); i2++) {
                SelectMappedBufferResult selectMappedBufferResult = (SelectMappedBufferResult) messageMapedList.get(i2);
                putMessageToCache(flatMessageFile, j + i2, new SelectBufferResult(selectMappedBufferResult.getByteBuffer(), selectMappedBufferResult.getStartOffset(), selectMappedBufferResult.getSize(), tagCodeList.get(i2).longValue()));
            }
            return (Long) messageQueueOffset.get(messageQueueOffset.size() - 1);
        });
    }

    public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(FlatMessageFile flatMessageFile, String str, long j, int i, MessageFilter messageFilter) {
        MessageQueue messageQueue = flatMessageFile.getMessageQueue();
        GetMessageResultExt messageFromCache = getMessageFromCache(flatMessageFile, j, i, messageFilter);
        if (GetMessageStatus.FOUND.equals(messageFromCache.getStatus())) {
            log.debug("MessageFetcher cache hit, group={}, topic={}, queueId={}, offset={}, maxCount={}, resultSize={}, lag={}", new Object[]{str, messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(messageFromCache.getMessageCount()), Long.valueOf(messageFromCache.getMaxOffset() - messageFromCache.getNextBeginOffset())});
            return CompletableFuture.completedFuture(messageFromCache);
        }
        log.debug("MessageFetcher cache miss, group={}, topic={}, queueId={}, offset={}, maxCount={}, lag={}", new Object[]{str, messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i), Long.valueOf(messageFromCache.getMaxOffset() - messageFromCache.getNextBeginOffset())});
        return fetchMessageThenPutToCache(flatMessageFile, j, i == 1 ? 32 : this.storeConfig.getReadAheadMessageCountThreshold()).thenApply(l -> {
            return getMessageFromCache(flatMessageFile, j, i, messageFilter);
        });
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x00d4, code lost:
    
        r0.setStatus(org.apache.rocketmq.store.GetMessageStatus.OFFSET_FOUND_NULL);
        r0.setNextBeginOffset(r10);
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x00e7, code lost:
    
        return java.util.concurrent.CompletableFuture.completedFuture(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.util.concurrent.CompletableFuture<org.apache.rocketmq.tieredstore.common.GetMessageResultExt> getMessageFromTieredStoreAsync(org.apache.rocketmq.tieredstore.file.FlatMessageFile r9, long r10, int r12) {
        /*
            Method dump skipped, instructions count: 285
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl.getMessageFromTieredStoreAsync(org.apache.rocketmq.tieredstore.file.FlatMessageFile, long, int):java.util.concurrent.CompletableFuture");
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreFetcher
    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        GetMessageResult getMessageResult = new GetMessageResult();
        FlatMessageFile flatFile = this.flatFileStore.getFlatFile(new MessageQueue(str2, this.brokerName, i));
        if (flatFile == null) {
            getMessageResult.setNextBeginOffset(j);
            getMessageResult.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
            return CompletableFuture.completedFuture(getMessageResult);
        }
        getMessageResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
        getMessageResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
        if (getMessageResult.getMaxOffset() <= 0) {
            getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
            getMessageResult.setNextBeginOffset(j);
            return CompletableFuture.completedFuture(getMessageResult);
        }
        if (j < getMessageResult.getMinOffset()) {
            getMessageResult.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
            getMessageResult.setNextBeginOffset(getMessageResult.getMinOffset());
            return CompletableFuture.completedFuture(getMessageResult);
        }
        if (j == getMessageResult.getMaxOffset()) {
            getMessageResult.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
            getMessageResult.setNextBeginOffset(getMessageResult.getMaxOffset());
            return CompletableFuture.completedFuture(getMessageResult);
        }
        if (j <= getMessageResult.getMaxOffset()) {
            return (!this.storeConfig.isReadAheadCacheEnable() || ((((double) this.fetcherCache.estimatedSize()) > (((double) this.memoryMaxSize) * 0.8d) ? 1 : (((double) this.fetcherCache.estimatedSize()) == (((double) this.memoryMaxSize) * 0.8d) ? 0 : -1)) > 0)) ? getMessageFromTieredStoreAsync(flatFile, j, i2).thenApply(getMessageResultExt -> {
                return getMessageResultExt.doFilterMessage(messageFilter);
            }) : getMessageFromCacheAsync(flatFile, str, j, i2, messageFilter);
        }
        getMessageResult.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
        getMessageResult.setNextBeginOffset(getMessageResult.getMaxOffset());
        return CompletableFuture.completedFuture(getMessageResult);
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreFetcher
    public CompletableFuture<Long> getEarliestMessageTimeAsync(String str, int i) {
        FlatMessageFile flatFile = this.flatFileStore.getFlatFile(new MessageQueue(str, this.brokerName, i));
        return CompletableFuture.completedFuture(Long.valueOf(flatFile == null ? -1L : flatFile.getMinStoreTimestamp()));
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreFetcher
    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String str, int i, long j) {
        FlatMessageFile flatFile = this.flatFileStore.getFlatFile(new MessageQueue(str, this.brokerName, i));
        return flatFile == null ? CompletableFuture.completedFuture(-1L) : flatFile.getConsumeQueueAsync(j).thenComposeAsync(byteBuffer -> {
            return flatFile.getCommitLogAsync(MessageFormatUtil.getCommitLogOffsetFromItem(byteBuffer), MessageFormatUtil.getSizeFromItem(byteBuffer));
        }, (Executor) this.messageStore.getStoreExecutor().bufferFetchExecutor).thenApply((Function<? super U, ? extends U>) MessageFormatUtil::getStoreTimeStamp).exceptionally(th -> {
            log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: get or decode message failed, topic={}, queue={}, offset={}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j), th});
            return -1L;
        });
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreFetcher
    public long getOffsetInQueueByTime(String str, int i, long j, BoundaryType boundaryType) {
        FlatMessageFile flatFile = this.flatFileStore.getFlatFile(new MessageQueue(str, this.brokerName, i));
        if (flatFile == null) {
            return -1L;
        }
        return flatFile.getQueueOffsetByTimeAsync(j, boundaryType).join().longValue();
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreFetcher
    public CompletableFuture<QueryMessageResult> queryMessageAsync(String str, String str2, int i, long j, long j2) {
        try {
            TopicMetadata topic = this.metadataStore.getTopic(str);
            if (topic == null) {
                log.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic={}", str);
                return CompletableFuture.completedFuture(new QueryMessageResult());
            }
            long topicId = topic.getTopicId();
            return this.indexService.queryAsync(str, str2, i, j, j2).thenCompose(list -> {
                FlatMessageFile flatFile;
                ArrayList arrayList = new ArrayList(i);
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    IndexItem indexItem = (IndexItem) it.next();
                    if (topicId == indexItem.getTopicId() && (flatFile = this.flatFileStore.getFlatFile(new MessageQueue(str, this.brokerName, indexItem.getQueueId()))) != null) {
                        arrayList.add(flatFile.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize()).thenApply(byteBuffer -> {
                            return new SelectMappedBufferResult(indexItem.getOffset(), byteBuffer, indexItem.getSize(), (MappedFile) null);
                        }));
                        if (arrayList.size() >= i) {
                            break;
                        }
                    }
                }
                return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenApply(r4 -> {
                    QueryMessageResult queryMessageResult = new QueryMessageResult();
                    arrayList.forEach(completableFuture -> {
                        queryMessageResult.getClass();
                        completableFuture.thenAccept(queryMessageResult::addMessage);
                    });
                    return queryMessageResult;
                });
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (queryMessageResult, th) -> {
                if (queryMessageResult != null) {
                    log.info("MessageFetcher#queryMessageAsync, query result={}, topic={}, topicId={}, key={}, maxCount={}, timestamp={}-{}", new Object[]{Integer.valueOf(queryMessageResult.getMessageBufferList().size()), str, Long.valueOf(topicId), str2, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2)});
                }
            });
        } catch (Exception e) {
            log.error("MessageFetcher#queryMessageAsync, get topic id failed, topic={}", str, e);
            return CompletableFuture.completedFuture(new QueryMessageResult());
        }
    }
}
