package org.apache.rocketmq.tieredstore.core;

import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.class */
public class MessageStoreDispatcherImpl extends ServiceThread implements MessageStoreDispatcher {
    protected static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
    protected final String brokerName;
    protected final MessageStore defaultStore;
    protected final MessageStoreConfig storeConfig;
    protected final TieredMessageStore messageStore;
    protected final FlatFileStore flatFileStore;
    protected final MessageStoreExecutor storeExecutor;
    protected final MessageStoreFilter topicFilter;
    protected final Semaphore semaphore;
    protected final IndexService indexService;
    protected final Map<FlatFileInterface, GroupCommitContext> failedGroupCommitMap = new ConcurrentHashMap();

    public MessageStoreDispatcherImpl(TieredMessageStore tieredMessageStore) {
        this.messageStore = tieredMessageStore;
        this.storeConfig = tieredMessageStore.getStoreConfig();
        this.defaultStore = tieredMessageStore.getDefaultStore();
        this.brokerName = this.storeConfig.getBrokerName();
        this.semaphore = new Semaphore(this.storeConfig.getTieredStoreMaxPendingLimit() / 4);
        this.topicFilter = tieredMessageStore.getTopicFilter();
        this.flatFileStore = tieredMessageStore.getFlatFileStore();
        this.storeExecutor = tieredMessageStore.getStoreExecutor();
        this.indexService = tieredMessageStore.getIndexService();
    }

    public String getServiceName() {
        return MessageStoreDispatcher.class.getSimpleName();
    }

    @VisibleForTesting
    public Map<FlatFileInterface, GroupCommitContext> getFailedGroupCommitMap() {
        return this.failedGroupCommitMap;
    }

    public void dispatchWithSemaphore(FlatFileInterface flatFileInterface) {
        try {
            if (this.stopped) {
                return;
            }
            this.semaphore.acquire();
            doScheduleDispatch(flatFileInterface, false).whenComplete((bool, th) -> {
                this.semaphore.release();
            });
        } catch (Throwable th2) {
            this.semaphore.release();
            log.error("MessageStore dispatch error, topic={}, queueId={}", new Object[]{flatFileInterface.getMessageQueue().getTopic(), Integer.valueOf(flatFileInterface.getMessageQueue().getQueueId()), th2});
        }
    }

    public void dispatch(DispatchRequest dispatchRequest) {
        if (this.stopped) {
            return;
        }
        if (this.topicFilter == null || !this.topicFilter.filterTopic(dispatchRequest.getTopic())) {
            this.flatFileStore.computeIfAbsent(new MessageQueue(dispatchRequest.getTopic(), this.brokerName, dispatchRequest.getQueueId()));
        }
    }

    @Override // org.apache.rocketmq.tieredstore.core.MessageStoreDispatcher
    public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFileInterface, boolean z) {
        if (this.stopped) {
            return CompletableFuture.completedFuture(true);
        }
        String topic = flatFileInterface.getMessageQueue().getTopic();
        int queueId = flatFileInterface.getMessageQueue().getQueueId();
        boolean z2 = !this.storeConfig.isTieredStoreGroupCommit() || z;
        if (z2) {
            flatFileInterface.getFileLock().lock();
        } else if (!flatFileInterface.getFileLock().tryLock()) {
            return CompletableFuture.completedFuture(false);
        }
        try {
            try {
                if (this.topicFilter != null && this.topicFilter.filterTopic(flatFileInterface.getMessageQueue().getTopic())) {
                    this.flatFileStore.destroyFile(flatFileInterface.getMessageQueue());
                    CompletableFuture<Boolean> completedFuture = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture;
                }
                long consumeQueueMaxOffset = flatFileInterface.getConsumeQueueMaxOffset();
                long consumeQueueCommitOffset = flatFileInterface.getConsumeQueueCommitOffset();
                long minOffsetInQueue = this.defaultStore.getMinOffsetInQueue(topic, queueId);
                long maxOffsetInQueue = this.defaultStore.getMaxOffsetInQueue(topic, queueId);
                if (!flatFileInterface.isFlatFileInit()) {
                    long min = Math.min(Math.max(this.defaultStore.getOffsetInQueueByTime(topic, queueId, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2L)), minOffsetInQueue), maxOffsetInQueue);
                    flatFileInterface.initOffset(min);
                    log.warn("MessageDispatcher#dispatch init, topic={}, queueId={}, offset={}-{}, current={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(min)});
                    CompletableFuture<Boolean> completedFuture2 = CompletableFuture.completedFuture(true);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture2;
                }
                if (consumeQueueCommitOffset < consumeQueueMaxOffset) {
                    commitAsync(flatFileInterface).whenComplete((bool, th) -> {
                        if (th != null) {
                            log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", new Object[]{topic, Integer.valueOf(queueId), th});
                        }
                    });
                    CompletableFuture<Boolean> completedFuture3 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture3;
                }
                if (this.failedGroupCommitMap.containsKey(flatFileInterface)) {
                    GroupCommitContext groupCommitContext = this.failedGroupCommitMap.get(flatFileInterface);
                    if (groupCommitContext.getEndOffset() <= consumeQueueCommitOffset) {
                        this.failedGroupCommitMap.remove(flatFileInterface);
                        constructIndexFile(flatFileInterface.getTopicId(), groupCommitContext);
                    }
                }
                if (consumeQueueMaxOffset < minOffsetInQueue) {
                    log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset)});
                    this.flatFileStore.destroyFile(flatFileInterface.getMessageQueue());
                    this.flatFileStore.computeIfAbsent(new MessageQueue(topic, this.brokerName, queueId));
                    CompletableFuture<Boolean> completedFuture4 = CompletableFuture.completedFuture(true);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture4;
                }
                if (consumeQueueMaxOffset > maxOffsetInQueue) {
                    log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset)});
                    CompletableFuture<Boolean> completedFuture5 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture5;
                }
                if (flatFileInterface.rollingFile(TimeUnit.HOURS.toMillis(this.storeConfig.getCommitLogRollingInterval()))) {
                    log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset)});
                }
                if (consumeQueueMaxOffset == maxOffsetInQueue) {
                    CompletableFuture<Boolean> completedFuture6 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture6;
                }
                long j = 0;
                long tieredStoreGroupCommitSize = this.storeConfig.getTieredStoreGroupCommitSize();
                long min2 = Math.min(consumeQueueMaxOffset + this.storeConfig.getTieredStoreGroupCommitCount(), maxOffsetInQueue);
                ConsumeQueueInterface consumeQueue = this.defaultStore.getConsumeQueue(topic, queueId);
                CqUnit cqUnit = consumeQueue.get(consumeQueueMaxOffset);
                if (cqUnit == null) {
                    log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset), Long.valueOf(maxOffsetInQueue - consumeQueueMaxOffset)});
                    CompletableFuture<Boolean> completedFuture7 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture7;
                }
                SelectMappedBufferResult selectOneMessageByOffset = this.defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
                if (selectOneMessageByOffset == null) {
                    log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset), Long.valueOf(maxOffsetInQueue - consumeQueueMaxOffset)});
                    CompletableFuture<Boolean> completedFuture8 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture8;
                }
                boolean z3 = MessageFormatUtil.getStoreTimeStamp(selectOneMessageByOffset.getByteBuffer()) + ((long) this.storeConfig.getTieredStoreGroupCommitTimeout()) < System.currentTimeMillis();
                boolean z4 = maxOffsetInQueue - consumeQueueMaxOffset > ((long) this.storeConfig.getTieredStoreGroupCommitCount());
                if (!z3 && !z4 && !z2) {
                    log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset), Long.valueOf(maxOffsetInQueue - consumeQueueMaxOffset)});
                    selectOneMessageByOffset.release();
                    CompletableFuture<Boolean> completedFuture9 = CompletableFuture.completedFuture(false);
                    flatFileInterface.getFileLock().unlock();
                    return completedFuture9;
                }
                if (MessageFormatUtil.getStoreTimeStamp(selectOneMessageByOffset.getByteBuffer()) + TimeUnit.MINUTES.toMillis(5L) < System.currentTimeMillis()) {
                    log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset), Long.valueOf(maxOffsetInQueue - consumeQueueMaxOffset)});
                } else {
                    log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}", new Object[]{topic, Integer.valueOf(queueId), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Long.valueOf(consumeQueueMaxOffset), Long.valueOf(maxOffsetInQueue - consumeQueueMaxOffset)});
                }
                selectOneMessageByOffset.release();
                long j2 = consumeQueueMaxOffset;
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                while (j2 < min2) {
                    CqUnit cqUnit2 = consumeQueue.get(j2);
                    j += cqUnit2.getSize();
                    if (j >= tieredStoreGroupCommitSize) {
                        break;
                    }
                    SelectMappedBufferResult selectOneMessageByOffset2 = this.defaultStore.selectOneMessageByOffset(cqUnit2.getPos(), cqUnit2.getSize());
                    arrayList.add(selectOneMessageByOffset2);
                    ByteBuffer byteBuffer = selectOneMessageByOffset2.getByteBuffer();
                    if (!AppendResult.SUCCESS.equals(flatFileInterface.appendCommitLog(selectOneMessageByOffset2))) {
                        break;
                    }
                    long commitLogMaxOffset = flatFileInterface.getCommitLogMaxOffset() - byteBuffer.remaining();
                    Map<String, String> properties = MessageFormatUtil.getProperties(byteBuffer);
                    DispatchRequest dispatchRequest = new DispatchRequest(topic, queueId, commitLogMaxOffset, cqUnit2.getSize(), cqUnit2.getTagsCode(), MessageFormatUtil.getStoreTimeStamp(byteBuffer), cqUnit2.getQueueOffset(), properties.getOrDefault("KEYS", ""), properties.getOrDefault("UNIQ_KEY", ""), 0, 0L, new HashMap());
                    dispatchRequest.setOffsetId(MessageFormatUtil.getOffsetId(byteBuffer));
                    if (!AppendResult.SUCCESS.equals(flatFileInterface.appendConsumeQueue(dispatchRequest))) {
                        break;
                    }
                    arrayList2.add(dispatchRequest);
                    j2++;
                }
                GroupCommitContext groupCommitContext2 = new GroupCommitContext();
                groupCommitContext2.setEndOffset(j2);
                groupCommitContext2.setBufferList(arrayList);
                groupCommitContext2.setDispatchRequests(arrayList2);
                boolean z5 = z3 || maxOffsetInQueue - j2 > ((long) this.storeConfig.getTieredStoreGroupCommitCount());
                if (!arrayList2.isEmpty()) {
                    TieredStoreMetricsManager.messagesDispatchTotal.add(j2 - consumeQueueMaxOffset, TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_TOPIC, topic).put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId).put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase()).build());
                    commitAsync(flatFileInterface).whenComplete((bool2, th2) -> {
                        if (bool2.booleanValue()) {
                            constructIndexFile(flatFileInterface.getTopicId(), groupCommitContext2);
                        } else {
                            GroupCommitContext put = this.failedGroupCommitMap.put(flatFileInterface, groupCommitContext2);
                            if (put != null) {
                                log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={}  ", topic, Integer.valueOf(queueId));
                                put.release();
                            }
                        }
                        if (bool2.booleanValue() && z5) {
                            this.storeExecutor.commonExecutor.submit(() -> {
                                dispatchWithSemaphore(flatFileInterface);
                            });
                        }
                    });
                }
                flatFileInterface.getFileLock().unlock();
                return CompletableFuture.completedFuture(false);
            } catch (ConsumeQueueException e) {
                CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(e);
                flatFileInterface.getFileLock().unlock();
                return completableFuture;
            }
        } catch (Throwable th3) {
            flatFileInterface.getFileLock().unlock();
            throw th3;
        }
    }

    public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFileInterface) {
        return flatFileInterface.commitAsync();
    }

    public void constructIndexFile(long j, GroupCommitContext groupCommitContext) {
        MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
            if (this.storeConfig.isMessageIndexEnable()) {
                try {
                    groupCommitContext.getDispatchRequests().forEach(dispatchRequest -> {
                        constructIndexFile0(j, dispatchRequest);
                    });
                } catch (Throwable th) {
                    log.error("constructIndexFile error {}", Long.valueOf(j), th);
                }
            }
            groupCommitContext.release();
        });
    }

    public void constructIndexFile0(long j, DispatchRequest dispatchRequest) {
        HashSet hashSet = new HashSet();
        if (StringUtils.isNotBlank(dispatchRequest.getUniqKey())) {
            hashSet.add(dispatchRequest.getUniqKey());
        }
        if (StringUtils.isNotBlank(dispatchRequest.getKeys())) {
            hashSet.addAll(Arrays.asList(dispatchRequest.getKeys().split(" ")));
        }
        this.indexService.putKey(dispatchRequest.getTopic(), (int) j, dispatchRequest.getQueueId(), hashSet, dispatchRequest.getCommitLogOffset(), dispatchRequest.getMsgSize(), dispatchRequest.getStoreTimestamp());
    }

    public void releaseClosedPendingGroupCommit() {
        Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> it = this.failedGroupCommitMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<FlatFileInterface, GroupCommitContext> next = it.next();
            if (next.getKey().isClosed()) {
                next.getValue().release();
                it.remove();
            }
        }
    }

    public void run() {
        log.info("{} service started", getServiceName());
        while (!isStopped()) {
            try {
                this.flatFileStore.deepCopyFlatFileToList().forEach((v1) -> {
                    dispatchWithSemaphore(v1);
                });
                releaseClosedPendingGroupCommit();
                waitForRunning(Duration.ofSeconds(20L).toMillis());
            } catch (Throwable th) {
                log.error("MessageStore dispatch error", th);
            }
        }
        log.info("{} service shutdown", getServiceName());
    }
}
