package org.apache.rocketmq.store;

import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.DefaultHAClient;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.hook.PutMessageHook;
import org.apache.rocketmq.store.hook.SendMessageBackHook;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
import org.apache.rocketmq.store.kv.CommitLogDispatcherCompaction;
import org.apache.rocketmq.store.kv.CompactionService;
import org.apache.rocketmq.store.kv.CompactionStore;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.rocksdb.RocksDBException;

/* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore.class */
public class DefaultMessageStore implements MessageStore {
    protected static final Logger LOGGER = LoggerFactory.getLogger("RocketmqStore");
    protected static final Logger ERROR_LOG = LoggerFactory.getLogger("RocketmqStoreError");
    private final MessageStoreConfig messageStoreConfig;
    protected final CommitLog commitLog;
    protected final ConsumeQueueStoreInterface consumeQueueStore;
    private final FlushConsumeQueueService flushConsumeQueueService;
    protected final CleanCommitLogService cleanCommitLogService;
    private final CleanConsumeQueueService cleanConsumeQueueService;
    private final CorrectLogicOffsetService correctLogicOffsetService;
    protected final IndexService indexService;
    private ReputMessageService reputMessageService;
    private HAService haService;
    private CompactionStore compactionStore;
    private CompactionService compactionService;
    private final StoreStatsService storeStatsService;
    private final TransientStorePool transientStorePool;
    private final ScheduledExecutorService scheduledExecutorService;
    private final BrokerStatsManager brokerStatsManager;
    private final MessageArrivingListener messageArrivingListener;
    private final BrokerConfig brokerConfig;
    protected StoreCheckpoint storeCheckpoint;
    private TimerMessageStore timerMessageStore;
    private final LinkedList<CommitLogDispatcher> dispatcherList;
    private RocksDBMessageStore rocksDBMessageStore;
    private RandomAccessFile lockFile;
    private FileLock lock;
    private static final int MAX_PULL_MSG_SIZE = 134217728;
    private volatile int aliveReplicasNum;
    private SendMessageBackHook sendMessageBackHook;
    private int maxDelayLevel;
    private ConcurrentMap<String, TopicConfig> topicConfigTable;
    public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(LOGGER);
    protected final RunningFlags runningFlags = new RunningFlags();
    private final SystemClock systemClock = new SystemClock();
    private volatile boolean shutdown = true;
    protected boolean notifyMessageArriveInBatch = false;
    boolean shutDownNormal = false;
    private MessageStore masterStoreInProcess = null;
    private volatile long masterFlushedOffset = -1;
    private volatile long brokerInitMaxOffset = -1;
    private List<PutMessageHook> putMessageHookList = new ArrayList();
    private final ConcurrentSkipListMap<Integer, Long> delayLevelTable = new ConcurrentSkipListMap<>();
    private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
    private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
    private int dispatchRequestOrderlyQueueSize = 16;
    private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(this.dispatchRequestOrderlyQueueSize);
    private long stateMachineVersion = 0;
    private final ScheduledExecutorService scheduledCleanQueueExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
    private final AllocateMappedFileService allocateMappedFileService = new AllocateMappedFileService(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$BatchDispatchRequest.class */
    public class BatchDispatchRequest {
        private ByteBuffer byteBuffer;
        private int position;
        private int size;
        private long id;

        public BatchDispatchRequest(ByteBuffer byteBuffer, int i, int i2, long j) {
            this.byteBuffer = byteBuffer;
            this.position = i;
            this.size = i2;
            this.id = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$CleanCommitLogService.class */
    public class CleanCommitLogService {
        private static final int MAX_MANUAL_DELETE_FILE_TIMES = 20;
        private final String diskSpaceWarningLevelRatio = System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "");
        private final String diskSpaceCleanForciblyRatio = System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "");
        private long lastRedeleteTimestamp = 0;
        private final AtomicInteger manualDeleteFileSeveralTimes = new AtomicInteger();
        private volatile boolean cleanImmediately = false;
        private int forceCleanFailedTimes = 0;

        CleanCommitLogService() {
        }

        double getDiskSpaceWarningLevelRatio() {
            double diskSpaceWarningLevelRatio = "".equals(this.diskSpaceWarningLevelRatio) ? DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0d : Double.parseDouble(this.diskSpaceWarningLevelRatio);
            if (diskSpaceWarningLevelRatio > 0.9d) {
                diskSpaceWarningLevelRatio = 0.9d;
            }
            if (diskSpaceWarningLevelRatio < 0.35d) {
                diskSpaceWarningLevelRatio = 0.35d;
            }
            return diskSpaceWarningLevelRatio;
        }

        double getDiskSpaceCleanForciblyRatio() {
            double diskSpaceCleanForciblyRatio = "".equals(this.diskSpaceCleanForciblyRatio) ? DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0d : Double.parseDouble(this.diskSpaceCleanForciblyRatio);
            if (diskSpaceCleanForciblyRatio > 0.85d) {
                diskSpaceCleanForciblyRatio = 0.85d;
            }
            if (diskSpaceCleanForciblyRatio < 0.3d) {
                diskSpaceCleanForciblyRatio = 0.3d;
            }
            return diskSpaceCleanForciblyRatio;
        }

        public void executeDeleteFilesManually() {
            this.manualDeleteFileSeveralTimes.set(20);
            DefaultMessageStore.LOGGER.info("executeDeleteFilesManually was invoked");
        }

        public void run() {
            try {
                deleteExpiredFiles();
                reDeleteHangedFile();
            } catch (Throwable th) {
                DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", th);
            }
        }

        private void deleteExpiredFiles() {
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            int deleteCommitLogFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();
            boolean isTimeToDelete = isTimeToDelete();
            boolean isSpaceToDelete = isSpaceToDelete();
            boolean z = this.manualDeleteFileSeveralTimes.get() > 0;
            if (isTimeToDelete || isSpaceToDelete || z) {
                if (z) {
                    this.manualDeleteFileSeveralTimes.decrementAndGet();
                }
                boolean z2 = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
                DefaultMessageStore.LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}", new Object[]{Long.valueOf(fileReservedTime), Boolean.valueOf(isTimeToDelete), Boolean.valueOf(isSpaceToDelete), Integer.valueOf(this.manualDeleteFileSeveralTimes.get()), Boolean.valueOf(z2), Integer.valueOf(deleteFileBatchMax)});
                if (DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime * 3600000, deleteCommitLogFilesInterval, destroyMapedFileIntervalForcibly, z2, deleteFileBatchMax) <= 0) {
                    if (isSpaceToDelete) {
                        DefaultMessageStore.LOGGER.warn("disk space will be full soon, but delete file failed.");
                    }
                } else if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode() && (DefaultMessageStore.this.haService instanceof AutoSwitchHAService)) {
                    ((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(DefaultMessageStore.this.getMinPhyOffset() - 1);
                }
            }
        }

        private void reDeleteHangedFile() {
            int redeleteHangedFileInterval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRedeleteTimestamp > redeleteHangedFileInterval) {
                this.lastRedeleteTimestamp = currentTimeMillis;
                if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly())) {
                }
            }
        }

        public String getServiceName() {
            return DefaultMessageStore.this.brokerConfig.getIdentifier() + CleanCommitLogService.class.getSimpleName();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean isTimeToDelete() {
            String deleteWhen = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            if (!UtilAll.isItTimeToDo(deleteWhen)) {
                return false;
            }
            DefaultMessageStore.LOGGER.info("it's time to reclaim disk space, " + deleteWhen);
            return true;
        }

        private boolean isSpaceToDelete() {
            this.cleanImmediately = false;
            String[] split = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog().trim().split(MixAll.MULTI_PATH_SPLITTER);
            HashSet hashSet = new HashSet();
            double d = 100.0d;
            String str = null;
            for (String str2 : split) {
                double diskPartitionSpaceUsedPercent = UtilAll.getDiskPartitionSpaceUsedPercent(str2);
                if (d > diskPartitionSpaceUsedPercent) {
                    d = diskPartitionSpaceUsedPercent;
                    str = str2;
                }
                if (diskPartitionSpaceUsedPercent > getDiskSpaceCleanForciblyRatio()) {
                    hashSet.add(str2);
                }
            }
            DefaultMessageStore.this.commitLog.setFullStorePaths(hashSet);
            if (d > getDiskSpaceWarningLevelRatio()) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + d + ", so mark disk full, storePathPhysic=" + str);
                }
                this.cleanImmediately = true;
                return true;
            }
            if (d > getDiskSpaceCleanForciblyRatio()) {
                this.cleanImmediately = true;
                return true;
            }
            if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.LOGGER.info("physic disk space OK " + d + ", so mark disk ok, storePathPhysic=" + str);
            }
            double diskPartitionSpaceUsedPercent2 = UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()));
            if (diskPartitionSpaceUsedPercent2 > getDiskSpaceWarningLevelRatio()) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + diskPartitionSpaceUsedPercent2 + ", so mark disk full");
                }
                this.cleanImmediately = true;
                return true;
            }
            if (diskPartitionSpaceUsedPercent2 > getDiskSpaceCleanForciblyRatio()) {
                this.cleanImmediately = true;
                return true;
            }
            if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.LOGGER.info("logics disk space OK " + diskPartitionSpaceUsedPercent2 + ", so mark disk ok");
            }
            double diskMaxUsedSpaceRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0d;
            if (DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition() <= 1) {
                if (d < 0.0d || d > diskMaxUsedSpaceRatio) {
                    DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + d);
                    return true;
                }
                if (diskPartitionSpaceUsedPercent2 >= 0.0d && diskPartitionSpaceUsedPercent2 <= diskMaxUsedSpaceRatio) {
                    return false;
                }
                DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + diskPartitionSpaceUsedPercent2);
                return true;
            }
            double majorFileSize = (1.0d * DefaultMessageStore.this.getMajorFileSize()) / (UtilAll.getDiskPartitionTotalSpace(str) / r0);
            if (majorFileSize > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) {
                DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}", new Object[]{Double.valueOf(majorFileSize), Double.valueOf(d), Boolean.valueOf(this.cleanImmediately)});
                this.cleanImmediately = true;
                return true;
            }
            boolean z = majorFileSize > diskMaxUsedSpaceRatio;
            if (z) {
                DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}", new Object[]{Double.valueOf(majorFileSize), Double.valueOf(diskMaxUsedSpaceRatio), Boolean.valueOf(this.cleanImmediately)});
            }
            return z;
        }

        public int getManualDeleteFileSeveralTimes() {
            return this.manualDeleteFileSeveralTimes.get();
        }

        public void setManualDeleteFileSeveralTimes(int i) {
            this.manualDeleteFileSeveralTimes.set(i);
        }

        public double calcStorePathPhysicRatio() {
            HashSet hashSet = new HashSet();
            double d = 100.0d;
            for (String str : DefaultMessageStore.this.getStorePathPhysic().trim().split(MixAll.MULTI_PATH_SPLITTER)) {
                double diskPartitionSpaceUsedPercent = UtilAll.isPathExists(str) ? UtilAll.getDiskPartitionSpaceUsedPercent(str) : -1.0d;
                d = Math.min(d, diskPartitionSpaceUsedPercent);
                if (diskPartitionSpaceUsedPercent > getDiskSpaceCleanForciblyRatio()) {
                    hashSet.add(str);
                }
            }
            DefaultMessageStore.this.commitLog.setFullStorePaths(hashSet);
            return d;
        }

        public boolean isSpaceFull() {
            double calcStorePathPhysicRatio = calcStorePathPhysicRatio();
            if (calcStorePathPhysicRatio > DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0d) {
                DefaultMessageStore.LOGGER.info("physic disk of commitLog used: " + calcStorePathPhysicRatio);
            }
            if (calcStorePathPhysicRatio > getDiskSpaceWarningLevelRatio()) {
                if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    return true;
                }
                DefaultMessageStore.LOGGER.error("physic disk of commitLog maybe full soon, used " + calcStorePathPhysicRatio + ", so mark disk full");
                return true;
            }
            if (DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                return false;
            }
            DefaultMessageStore.LOGGER.info("physic disk space of commitLog OK " + calcStorePathPhysicRatio + ", so mark disk ok");
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$CleanConsumeQueueService.class */
    public class CleanConsumeQueueService {
        protected long lastPhysicalMinOffset = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CleanConsumeQueueService() {
        }

        public void run() {
            try {
                deleteExpiredFiles();
            } catch (Throwable th) {
                DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", th);
            }
        }

        protected void deleteExpiredFiles() {
            int deleteConsumeQueueFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            if (minOffset > this.lastPhysicalMinOffset) {
                this.lastPhysicalMinOffset = minOffset;
                Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = DefaultMessageStore.this.getConsumeQueueTable().values().iterator();
                while (it.hasNext()) {
                    Iterator<ConsumeQueueInterface> it2 = it.next().values().iterator();
                    while (it2.hasNext()) {
                        if (DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(it2.next(), minOffset) > 0 && deleteConsumeQueueFilesInterval > 0) {
                            try {
                                Thread.sleep(deleteConsumeQueueFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
            }
        }

        public String getServiceName() {
            return DefaultMessageStore.this.brokerConfig.getIdentifier() + CleanConsumeQueueService.class.getSimpleName();
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$CommitLogDispatcherBuildConsumeQueue.class */
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
        CommitLogDispatcherBuildConsumeQueue() {
        }

        @Override // org.apache.rocketmq.store.CommitLogDispatcher
        public void dispatch(DispatchRequest dispatchRequest) throws RocksDBException {
            switch (MessageSysFlag.getTransactionValue(dispatchRequest.getSysFlag())) {
                case 0:
                case DefaultHAClient.REPORT_HEADER_SIZE /* 8 */:
                    DefaultMessageStore.this.putMessagePositionInfo(dispatchRequest);
                    return;
                case 4:
                case 12:
                default:
                    return;
            }
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$CommitLogDispatcherBuildIndex.class */
    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
        CommitLogDispatcherBuildIndex() {
        }

        @Override // org.apache.rocketmq.store.CommitLogDispatcher
        public void dispatch(DispatchRequest dispatchRequest) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(dispatchRequest);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$ConcurrentReputMessageService.class */
    public class ConcurrentReputMessageService extends ReputMessageService {
        private static final int BATCH_SIZE = 4194304;
        private long batchId;
        private MainBatchDispatchRequestService mainBatchDispatchRequestService;
        private DispatchService dispatchService;

        public ConcurrentReputMessageService() {
            super();
            this.batchId = 0L;
            this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService();
            this.dispatchService = new DispatchService();
        }

        public void createBatchDispatchRequest(ByteBuffer byteBuffer, int i, int i2) {
            if (i < 0) {
                return;
            }
            DefaultMessageStore.this.mappedPageHoldCount.getAndIncrement();
            DefaultMessageStore defaultMessageStore = DefaultMessageStore.this;
            ByteBuffer duplicate = byteBuffer.duplicate();
            long j = this.batchId;
            this.batchId = j + 1;
            DefaultMessageStore.this.batchDispatchRequestQueue.offer(new BatchDispatchRequest(duplicate, i, i2, j));
        }

        public void start() {
            super.start();
            this.mainBatchDispatchRequestService.start();
            this.dispatchService.start();
        }

        @Override // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
        public void doReput() {
            SelectMappedBufferResult data;
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                DefaultMessageStore.LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", Long.valueOf(this.reputFromOffset), Long.valueOf(DefaultMessageStore.this.commitLog.getMinOffset()));
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            boolean z = true;
            while (isCommitLogAvailable() && z && (data = DefaultMessageStore.this.commitLog.getData(this.reputFromOffset)) != null) {
                int i = -1;
                int i2 = -1;
                try {
                    this.reputFromOffset = data.getStartOffset();
                    int i3 = 0;
                    while (i3 < data.getSize() && this.reputFromOffset < getReputEndOffset() && z) {
                        ByteBuffer byteBuffer = data.getByteBuffer();
                        int preCheckMessageAndReturnSize = preCheckMessageAndReturnSize(byteBuffer);
                        if (preCheckMessageAndReturnSize > 0) {
                            if (i == -1) {
                                i = byteBuffer.position();
                                i2 = 0;
                            }
                            i2 += preCheckMessageAndReturnSize;
                            if (i2 > BATCH_SIZE) {
                                createBatchDispatchRequest(byteBuffer, i, i2);
                                i = -1;
                                i2 = -1;
                            }
                            byteBuffer.position(byteBuffer.position() + preCheckMessageAndReturnSize);
                            this.reputFromOffset += preCheckMessageAndReturnSize;
                            i3 += preCheckMessageAndReturnSize;
                        } else {
                            z = false;
                            if (preCheckMessageAndReturnSize == 0) {
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            }
                            createBatchDispatchRequest(byteBuffer, i, i2);
                            i = -1;
                            i2 = -1;
                        }
                    }
                    createBatchDispatchRequest(data.getByteBuffer(), i, i2);
                    boolean z2 = DefaultMessageStore.this.mappedPageHoldCount.get() == 0;
                    while (!z2) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(1L);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        z2 = DefaultMessageStore.this.mappedPageHoldCount.get() == 0;
                    }
                    data.release();
                } catch (Throwable th) {
                    createBatchDispatchRequest(data.getByteBuffer(), i, i2);
                    boolean z3 = DefaultMessageStore.this.mappedPageHoldCount.get() == 0;
                    while (!z3) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(1L);
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                        z3 = DefaultMessageStore.this.mappedPageHoldCount.get() == 0;
                    }
                    data.release();
                    throw th;
                }
            }
            DefaultMessageStore.this.finishCommitLogDispatch();
        }

        public int preCheckMessageAndReturnSize(ByteBuffer byteBuffer) {
            byteBuffer.mark();
            int i = byteBuffer.getInt();
            if (this.reputFromOffset + i > DefaultMessageStore.this.getConfirmOffset()) {
                return -1;
            }
            switch (byteBuffer.getInt()) {
                case -875286124:
                    return 0;
                case CommitLog.MESSAGE_MAGIC_CODE /* -626843481 */:
                case -626843477:
                    byteBuffer.reset();
                    return i;
                default:
                    return -1;
            }
        }

        @Override // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
        public void shutdown() {
            for (int i = 0; i < 50 && isCommitLogAvailable(); i++) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            if (isCommitLogAvailable()) {
                DefaultMessageStore.LOGGER.warn("shutdown concurrentReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max offset={}, reputFromOffset={}", Long.valueOf(DefaultMessageStore.this.commitLog.getMaxOffset()), Long.valueOf(this.reputFromOffset));
            }
            this.mainBatchDispatchRequestService.shutdown();
            this.dispatchService.shutdown();
            super.shutdown();
        }

        @Override // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
        public String getServiceName() {
            return DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer() ? DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ConcurrentReputMessageService.class.getSimpleName() : ConcurrentReputMessageService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$CorrectLogicOffsetService.class */
    public class CorrectLogicOffsetService {
        private long lastForceCorrectTime = -1;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CorrectLogicOffsetService() {
        }

        public void run() {
            try {
                correctLogicMinOffset();
            } catch (Throwable th) {
                DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", th);
            }
        }

        private boolean needCorrect(ConsumeQueueInterface consumeQueueInterface, long j, long j2) {
            if (consumeQueueInterface == null) {
                return false;
            }
            if (DefaultMessageStore.this.consumeQueueStore.isFirstFileExist(consumeQueueInterface) && !DefaultMessageStore.this.consumeQueueStore.isFirstFileAvailable(consumeQueueInterface)) {
                DefaultMessageStore.LOGGER.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct. topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}", new Object[]{consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), Long.valueOf(consumeQueueInterface.getMaxPhysicOffset()), Long.valueOf(j), Long.valueOf(consumeQueueInterface.getMinOffsetInQueue()), Long.valueOf(consumeQueueInterface.getMaxOffsetInQueue()), consumeQueueInterface.getCQType()});
                return true;
            }
            if (consumeQueueInterface.getMaxPhysicOffset() == -1 || j == -1) {
                return false;
            }
            if (consumeQueueInterface.getMaxPhysicOffset() < j) {
                if (consumeQueueInterface.getMinOffsetInQueue() < consumeQueueInterface.getMaxOffsetInQueue()) {
                    DefaultMessageStore.LOGGER.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is less than min phy offset: {}, but min offset: {} is less than max offset: {}. topic:{}, queue:{}, cqType:{}.", new Object[]{Long.valueOf(consumeQueueInterface.getMaxPhysicOffset()), Long.valueOf(j), Long.valueOf(consumeQueueInterface.getMinOffsetInQueue()), Long.valueOf(consumeQueueInterface.getMaxOffsetInQueue()), consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), consumeQueueInterface.getCQType()});
                    return true;
                }
                if (consumeQueueInterface.getMinOffsetInQueue() == consumeQueueInterface.getMaxOffsetInQueue()) {
                    return false;
                }
                DefaultMessageStore.LOGGER.error("CorrectLogicOffsetService.needCorrect. It should not happen, logic max phy offset: {} is less than min phy offset: {}, but min offset: {} is larger than max offset: {}. topic:{}, queue:{}, cqType:{}", new Object[]{Long.valueOf(consumeQueueInterface.getMaxPhysicOffset()), Long.valueOf(j), Long.valueOf(consumeQueueInterface.getMinOffsetInQueue()), Long.valueOf(consumeQueueInterface.getMaxOffsetInQueue()), consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), consumeQueueInterface.getCQType()});
                return false;
            }
            if (System.currentTimeMillis() - j2 <= DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval()) {
                return false;
            }
            this.lastForceCorrectTime = System.currentTimeMillis();
            CqUnit earliestUnit = consumeQueueInterface.getEarliestUnit();
            if (earliestUnit == null) {
                if (consumeQueueInterface.getMinOffsetInQueue() == consumeQueueInterface.getMaxOffsetInQueue()) {
                    return false;
                }
                DefaultMessageStore.LOGGER.error("CorrectLogicOffsetService.needCorrect. cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, but min offset: {} is not equal to max offset: {}. topic:{}, queue:{}, cqType:{}.", new Object[]{Long.valueOf(consumeQueueInterface.getMaxPhysicOffset()), Long.valueOf(j), Long.valueOf(consumeQueueInterface.getMinOffsetInQueue()), Long.valueOf(consumeQueueInterface.getMaxOffsetInQueue()), consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), consumeQueueInterface.getCQType()});
                return true;
            }
            if (earliestUnit.getPos() >= j) {
                return earliestUnit.getPos() >= j ? false : false;
            }
            DefaultMessageStore.LOGGER.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is greater than min phy offset: {}, but minPhyPos in cq is: {}. min offset in queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}.", new Object[]{Long.valueOf(consumeQueueInterface.getMaxPhysicOffset()), Long.valueOf(j), Long.valueOf(earliestUnit.getPos()), Long.valueOf(consumeQueueInterface.getMinOffsetInQueue()), Long.valueOf(consumeQueueInterface.getMaxOffsetInQueue()), consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), consumeQueueInterface.getCQType()});
            return true;
        }

        private void correctLogicMinOffset() {
            long j = this.lastForceCorrectTime;
            long minPhyOffset = DefaultMessageStore.this.getMinPhyOffset();
            Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = DefaultMessageStore.this.getConsumeQueueTable().values().iterator();
            while (it.hasNext()) {
                for (ConsumeQueueInterface consumeQueueInterface : it.next().values()) {
                    if (!Objects.equals(CQType.SimpleCQ, consumeQueueInterface.getCQType()) && needCorrect(consumeQueueInterface, minPhyOffset, j)) {
                        doCorrect(consumeQueueInterface, minPhyOffset);
                    }
                }
            }
        }

        private void doCorrect(ConsumeQueueInterface consumeQueueInterface, long j) {
            DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(consumeQueueInterface, j);
            int correctLogicMinOffsetSleepInterval = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
            if (correctLogicMinOffsetSleepInterval > 0) {
                try {
                    Thread.sleep(correctLogicMinOffsetSleepInterval);
                } catch (InterruptedException e) {
                }
            }
        }

        public String getServiceName() {
            return DefaultMessageStore.this.brokerConfig.isInBrokerContainer() ? DefaultMessageStore.this.brokerConfig.getIdentifier() + CorrectLogicOffsetService.class.getSimpleName() : CorrectLogicOffsetService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$DispatchRequestOrderlyQueue.class */
    public class DispatchRequestOrderlyQueue {
        DispatchRequest[][] buffer;
        long ptr = 0;
        AtomicLong maxPtr = new AtomicLong();

        /* JADX WARN: Type inference failed for: r1v4, types: [org.apache.rocketmq.store.DispatchRequest[], org.apache.rocketmq.store.DispatchRequest[][]] */
        public DispatchRequestOrderlyQueue(int i) {
            this.buffer = new DispatchRequest[i];
        }

        public void put(long j, DispatchRequest[] dispatchRequestArr) {
            while (this.ptr + this.buffer.length <= j) {
                synchronized (this) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            this.buffer[(int) (j % this.buffer.length)] = dispatchRequestArr;
            this.maxPtr.incrementAndGet();
        }

        public DispatchRequest[] get(List<DispatchRequest[]> list) {
            synchronized (this) {
                for (int i = 0; i < this.buffer.length; i++) {
                    int length = (int) (this.ptr % this.buffer.length);
                    DispatchRequest[] dispatchRequestArr = this.buffer[length];
                    if (dispatchRequestArr == null) {
                        notifyAll();
                        return null;
                    }
                    list.add(dispatchRequestArr);
                    this.buffer[length] = null;
                    this.ptr++;
                }
                return null;
            }
        }

        public synchronized boolean isEmpty() {
            return this.maxPtr.get() == this.ptr;
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$DispatchService.class */
    class DispatchService extends ServiceThread {
        private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList();

        DispatchService() {
        }

        private void dispatch() throws Exception {
            this.dispatchRequestsList.clear();
            DefaultMessageStore.this.dispatchRequestOrderlyQueue.get(this.dispatchRequestsList);
            if (this.dispatchRequestsList.isEmpty()) {
                return;
            }
            for (DispatchRequest[] dispatchRequestArr : this.dispatchRequestsList) {
                for (DispatchRequest dispatchRequest : dispatchRequestArr) {
                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                    DefaultMessageStore.this.notifyMessageArriveIfNecessary(dispatchRequest);
                    if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1L);
                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());
                    }
                }
            }
        }

        public void run() {
            DefaultMessageStore.LOGGER.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1L);
                    dispatch();
                } catch (Exception e) {
                    DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", e);
                }
            }
            DefaultMessageStore.LOGGER.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer() ? DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName() : DispatchService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$FlushConsumeQueueService.class */
    public class FlushConsumeQueueService extends ServiceThread {
        private static final int RETRY_TIMES_OVER = 3;
        private long lastFlushTimestamp = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        public FlushConsumeQueueService() {
        }

        private void doFlush(int i) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
            if (i == 3) {
                flushConsumeQueueLeastPages = 0;
            }
            long j = 0;
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= this.lastFlushTimestamp + flushConsumeQueueThoroughInterval) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                j = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
            Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = DefaultMessageStore.this.getConsumeQueueTable().values().iterator();
            while (it.hasNext()) {
                for (ConsumeQueueInterface consumeQueueInterface : it.next().values()) {
                    boolean z = false;
                    for (int i2 = 0; i2 < i && !z; i2++) {
                        z = DefaultMessageStore.this.consumeQueueStore.flush(consumeQueueInterface, flushConsumeQueueLeastPages);
                    }
                }
            }
            if (DefaultMessageStore.this.messageStoreConfig.isEnableCompaction()) {
                DefaultMessageStore.this.compactionStore.flush(flushConsumeQueueLeastPages);
            }
            if (0 == flushConsumeQueueLeastPages) {
                if (j > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(j);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }

        public void run() {
            DefaultMessageStore.LOGGER.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    waitForRunning(DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue());
                    doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", e);
                }
            }
            doFlush(3);
            DefaultMessageStore.LOGGER.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return DefaultMessageStore.this.brokerConfig.isInBrokerContainer() ? DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + FlushConsumeQueueService.class.getSimpleName() : FlushConsumeQueueService.class.getSimpleName();
        }

        public long getJoinTime() {
            return BrokerStatsManager.ACCOUNT_STAT_INVERTAL;
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$MainBatchDispatchRequestService.class */
    class MainBatchDispatchRequestService extends ServiceThread {
        private final ExecutorService batchDispatchRequestExecutor;

        public MainBatchDispatchRequestService() {
            this.batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor(DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), BrokerStatsManager.ACCOUNT_STAT_INVERTAL, TimeUnit.MICROSECONDS, new LinkedBlockingQueue(DefaultMappedFile.OS_PAGE_SIZE), new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"), new ThreadPoolExecutor.AbortPolicy());
        }

        private void pollBatchDispatchRequest() {
            try {
                if (!DefaultMessageStore.this.batchDispatchRequestQueue.isEmpty()) {
                    BatchDispatchRequest batchDispatchRequest = (BatchDispatchRequest) DefaultMessageStore.this.batchDispatchRequestQueue.peek();
                    this.batchDispatchRequestExecutor.execute(() -> {
                        try {
                            ByteBuffer byteBuffer = batchDispatchRequest.byteBuffer;
                            byteBuffer.position(batchDispatchRequest.position);
                            byteBuffer.limit(batchDispatchRequest.position + batchDispatchRequest.size);
                            ArrayList arrayList = new ArrayList();
                            while (byteBuffer.hasRemaining()) {
                                DispatchRequest checkMessageAndReturnSize = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(byteBuffer, false, false, false);
                                if (checkMessageAndReturnSize.isSuccess()) {
                                    arrayList.add(checkMessageAndReturnSize);
                                } else {
                                    DefaultMessageStore.LOGGER.error("[BUG]read total count not equals msg total size.");
                                }
                            }
                            DefaultMessageStore.this.dispatchRequestOrderlyQueue.put(batchDispatchRequest.id, (DispatchRequest[]) arrayList.toArray(new DispatchRequest[arrayList.size()]));
                            DefaultMessageStore.this.mappedPageHoldCount.getAndDecrement();
                        } catch (Exception e) {
                            DefaultMessageStore.LOGGER.error("There is an exception in task execution.", e);
                        }
                    });
                    DefaultMessageStore.this.batchDispatchRequestQueue.poll();
                }
            } catch (Exception e) {
                DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", e);
            }
        }

        public void run() {
            DefaultMessageStore.LOGGER.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1L);
                    pollBatchDispatchRequest();
                } catch (Exception e) {
                    DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", e);
                }
            }
            DefaultMessageStore.LOGGER.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer() ? DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName() : MainBatchDispatchRequestService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/DefaultMessageStore$ReputMessageService.class */
    public class ReputMessageService extends ServiceThread {
        protected volatile long reputFromOffset = 0;

        ReputMessageService() {
        }

        public long getReputFromOffset() {
            return this.reputFromOffset;
        }

        public void setReputFromOffset(long j) {
            this.reputFromOffset = j;
        }

        public void shutdown() {
            for (int i = 0; i < 50 && isCommitLogAvailable(); i++) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            if (isCommitLogAvailable()) {
                DefaultMessageStore.LOGGER.warn("shutdown ReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max offset={}, reputFromOffset={}", Long.valueOf(DefaultMessageStore.this.commitLog.getMaxOffset()), Long.valueOf(this.reputFromOffset));
            }
            super.shutdown();
        }

        public long behind() {
            return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
        }

        public boolean isCommitLogAvailable() {
            return this.reputFromOffset < getReputEndOffset();
        }

        protected long getReputEndOffset() {
            return DefaultMessageStore.this.getMessageStoreConfig().isReadUnCommitted() ? DefaultMessageStore.this.commitLog.getMaxOffset() : DefaultMessageStore.this.commitLog.getConfirmOffset();
        }

        public void doReput() {
            SelectMappedBufferResult data;
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                DefaultMessageStore.LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", Long.valueOf(this.reputFromOffset), Long.valueOf(DefaultMessageStore.this.commitLog.getMinOffset()));
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            boolean z = true;
            while (isCommitLogAvailable() && z && (data = DefaultMessageStore.this.commitLog.getData(this.reputFromOffset)) != null) {
                try {
                    try {
                        this.reputFromOffset = data.getStartOffset();
                        int i = 0;
                        while (true) {
                            if (i >= data.getSize() || this.reputFromOffset >= getReputEndOffset() || !z) {
                                break;
                            }
                            DispatchRequest checkMessageAndReturnSize = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(data.getByteBuffer(), false, false, false);
                            int msgSize = checkMessageAndReturnSize.getBufferSize() == -1 ? checkMessageAndReturnSize.getMsgSize() : checkMessageAndReturnSize.getBufferSize();
                            if (this.reputFromOffset + msgSize > getReputEndOffset()) {
                                z = false;
                                break;
                            }
                            if (checkMessageAndReturnSize.isSuccess()) {
                                if (msgSize > 0) {
                                    DefaultMessageStore.this.doDispatch(checkMessageAndReturnSize);
                                    if (!DefaultMessageStore.this.notifyMessageArriveInBatch) {
                                        DefaultMessageStore.this.notifyMessageArriveIfNecessary(checkMessageAndReturnSize);
                                    }
                                    this.reputFromOffset += msgSize;
                                    i += msgSize;
                                    if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(checkMessageAndReturnSize.getTopic()).add(checkMessageAndReturnSize.getBatchSize());
                                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(checkMessageAndReturnSize.getTopic()).add(checkMessageAndReturnSize.getMsgSize());
                                    }
                                } else if (msgSize == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    i = data.getSize();
                                }
                            } else if (msgSize > 0) {
                                DefaultMessageStore.LOGGER.error("[BUG]read total count not equals msg total size. reputFromOffset={}", Long.valueOf(this.reputFromOffset));
                                this.reputFromOffset += msgSize;
                            } else {
                                z = false;
                                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == 0) {
                                    DefaultMessageStore.LOGGER.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", Long.valueOf(this.reputFromOffset));
                                    this.reputFromOffset += data.getSize() - i;
                                }
                            }
                        }
                        DefaultMessageStore.this.finishCommitLogDispatch();
                    } catch (RocksDBException e) {
                        DefaultMessageStore.ERROR_LOG.info("dispatch message to cq exception. reputFromOffset: {}", Long.valueOf(this.reputFromOffset), e);
                        data.release();
                        return;
                    }
                } finally {
                    data.release();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) {
            Map<String, String> propertiesMap = dispatchRequest.getPropertiesMap();
            if (propertiesMap == null || dispatchRequest.getTopic().startsWith("%RETRY%")) {
                return;
            }
            String str = propertiesMap.get("INNER_MULTI_DISPATCH");
            String str2 = propertiesMap.get("INNER_MULTI_QUEUE_OFFSET");
            if (StringUtils.isBlank(str) || StringUtils.isBlank(str2)) {
                return;
            }
            String[] split = str.split(",");
            String[] split2 = str2.split(",");
            if (split.length != split2.length) {
                return;
            }
            for (int i = 0; i < split.length; i++) {
                String str3 = split[i];
                long parseLong = Long.parseLong(split2[i]);
                int queueId = dispatchRequest.getQueueId();
                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(str3)) {
                    queueId = 0;
                }
                DefaultMessageStore.this.messageArrivingListener.arriving(str3, queueId, parseLong + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
            }
        }

        public void run() {
            DefaultMessageStore.LOGGER.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1L);
                    doReput();
                } catch (Throwable th) {
                    DefaultMessageStore.LOGGER.warn(getServiceName() + " service has exception. ", th);
                }
            }
            DefaultMessageStore.LOGGER.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer() ? DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName() : ReputMessageService.class.getSimpleName();
        }
    }

    public DefaultMessageStore(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig, ConcurrentMap<String, TopicConfig> concurrentMap) throws IOException {
        this.aliveReplicasNum = 1;
        this.messageArrivingListener = messageArrivingListener;
        this.brokerConfig = brokerConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.aliveReplicasNum = messageStoreConfig.getTotalReplicas();
        this.brokerStatsManager = brokerStatsManager;
        this.topicConfigTable = concurrentMap;
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        this.consumeQueueStore = createConsumeQueueStore();
        this.flushConsumeQueueService = createFlushConsumeQueueService();
        this.cleanCommitLogService = new CleanCommitLogService();
        this.cleanConsumeQueueService = createCleanConsumeQueueService();
        this.correctLogicOffsetService = createCorrectLogicOffsetService();
        this.storeStatsService = new StoreStatsService(getBrokerIdentity());
        this.indexService = new IndexService(this);
        if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
            if (brokerConfig.isEnableControllerMode()) {
                this.haService = new AutoSwitchHAService();
                LOGGER.warn("Load AutoSwitch HA Service: {}", AutoSwitchHAService.class.getSimpleName());
            } else {
                this.haService = (HAService) ServiceProvider.loadClass(HAService.class);
                if (null == this.haService) {
                    this.haService = new DefaultHAService();
                    LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
                }
            }
        }
        if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
            this.reputMessageService = new ConcurrentReputMessageService();
        } else {
            this.reputMessageService = new ReputMessageService();
        }
        this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
        this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
        this.dispatcherList = new LinkedList<>();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
        if (messageStoreConfig.isEnableCompaction()) {
            this.compactionStore = new CompactionStore(this);
            this.compactionService = new CompactionService(this.commitLog, this, this.compactionStore);
            this.dispatcherList.addLast(new CommitLogDispatcherCompaction(this.compactionService));
        }
        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
        UtilAll.ensureDirOK(file.getParent());
        UtilAll.ensureDirOK(getStorePathPhysic());
        UtilAll.ensureDirOK(getStorePathLogic());
        this.lockFile = new RandomAccessFile(file, "rw");
        parseDelayLevel();
    }

    public ConsumeQueueStoreInterface createConsumeQueueStore() {
        return new ConsumeQueueStore(this);
    }

    public CleanConsumeQueueService createCleanConsumeQueueService() {
        return new CleanConsumeQueueService();
    }

    public FlushConsumeQueueService createFlushConsumeQueueService() {
        return new FlushConsumeQueueService();
    }

    public CorrectLogicOffsetService createCorrectLogicOffsetService() {
        return new CorrectLogicOffsetService();
    }

    public boolean parseDelayLevel() {
        HashMap hashMap = new HashMap();
        hashMap.put("s", 1000L);
        hashMap.put("m", Long.valueOf(BrokerStatsManager.ACCOUNT_STAT_INVERTAL));
        hashMap.put("h", 3600000L);
        hashMap.put("d", 86400000L);
        String messageDelayLevel = this.messageStoreConfig.getMessageDelayLevel();
        try {
            String[] split = messageDelayLevel.split(" ");
            for (int i = 0; i < split.length; i++) {
                String str = split[i];
                Long l = (Long) hashMap.get(str.substring(str.length() - 1));
                int i2 = i + 1;
                if (i2 > this.maxDelayLevel) {
                    this.maxDelayLevel = i2;
                }
                this.delayLevelTable.put(Integer.valueOf(i2), Long.valueOf(l.longValue() * Long.parseLong(str.substring(0, str.length() - 1))));
            }
            return true;
        } catch (Exception e) {
            LOGGER.error("parse message delay level failed. messageDelayLevel = {}", messageDelayLevel, e);
            return false;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void truncateDirtyLogicFiles(long j) throws RocksDBException {
        this.consumeQueueStore.truncateDirty(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean load() {
        boolean z;
        try {
            boolean z2 = !isTempFileExist();
            LOGGER.info("last shutdown {}, store path root dir: {}", z2 ? "normally" : "abnormally", this.messageStoreConfig.getStorePathRootDir());
            z = this.commitLog.load() && this.consumeQueueStore.load();
            if (this.messageStoreConfig.isEnableCompaction()) {
                z = z && this.compactionService.load(z2);
            }
            if (z) {
                loadCheckPoint();
                z = this.indexService.load(z2);
                recover(z2);
                LOGGER.info("message store recover end, and the max phy offset = {}", Long.valueOf(getMaxPhyOffset()));
            }
            long maxPhyOffset = getMaxPhyOffset();
            setBrokerInitMaxOffset(maxPhyOffset);
            LOGGER.info("load over, and the max phy offset = {}", Long.valueOf(maxPhyOffset));
        } catch (Exception e) {
            LOGGER.error("load exception", e);
            z = false;
        }
        if (!z) {
            this.allocateMappedFileService.shutdown();
        }
        return z;
    }

    public void loadCheckPoint() throws IOException {
        this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
        this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
        setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void start() throws Exception {
        if (!this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
            this.haService.init(this);
        }
        if (isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }
        this.allocateMappedFileService.start();
        this.indexService.start();
        this.lock = this.lockFile.getChannel().tryLock(0L, 1L, false);
        if (this.lock == null || this.lock.isShared() || !this.lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }
        this.lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes(StandardCharsets.UTF_8)));
        this.lockFile.getChannel().force(true);
        this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        this.reputMessageService.start();
        doRecheckReputOffsetFromCq();
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.consumeQueueStore.start();
        this.storeStatsService.start();
        if (this.haService != null) {
            this.haService.start();
        }
        createTempFile();
        addScheduleTask();
        this.perfs.start();
        this.shutdown = false;
    }

    private void doRecheckReputOffsetFromCq() throws InterruptedException {
        if (this.messageStoreConfig.isRecheckReputOffsetFromCq()) {
            long minOffset = this.commitLog.getMinOffset();
            Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = getConsumeQueueTable().values().iterator();
            while (it.hasNext()) {
                for (ConsumeQueueInterface consumeQueueInterface : it.next().values()) {
                    if (consumeQueueInterface.getMaxPhysicOffset() > minOffset) {
                        minOffset = consumeQueueInterface.getMaxPhysicOffset();
                    }
                }
            }
            if (minOffset < 0) {
                minOffset = 0;
            }
            if (minOffset < this.commitLog.getMinOffset()) {
                minOffset = this.commitLog.getMinOffset();
                LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", Long.valueOf(minOffset), Long.valueOf(this.commitLog.getMinOffset()));
            }
            LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", new Object[]{Long.valueOf(minOffset), Long.valueOf(this.commitLog.getMinOffset()), Long.valueOf(this.commitLog.getMaxOffset()), Long.valueOf(this.commitLog.getConfirmOffset())});
            this.reputMessageService.setReputFromOffset(minOffset);
            while (dispatchBehindBytes() > 0) {
                Thread.sleep(1000L);
                LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", new Object[]{Long.valueOf(this.reputMessageService.getReputFromOffset()), Long.valueOf(getMaxPhyOffset()), Long.valueOf(dispatchBehindBytes())});
            }
            recoverTopicQueueTable();
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;
            this.scheduledExecutorService.shutdown();
            this.scheduledCleanQueueExecutorService.shutdown();
            try {
                this.scheduledExecutorService.awaitTermination(3L, TimeUnit.SECONDS);
                this.scheduledCleanQueueExecutorService.awaitTermination(3L, TimeUnit.SECONDS);
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                LOGGER.error("shutdown Exception, ", e);
            }
            if (this.haService != null) {
                this.haService.shutdown();
            }
            this.storeStatsService.shutdown();
            this.commitLog.shutdown();
            this.reputMessageService.shutdown();
            this.consumeQueueStore.shutdown();
            this.indexService.shutdown();
            if (this.compactionService != null) {
                this.compactionService.shutdown();
            }
            if (this.messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
                this.rocksDBMessageStore.consumeQueueStore.shutdown();
            }
            this.flushConsumeQueueService.shutdown();
            this.allocateMappedFileService.shutdown();
            this.storeCheckpoint.flush();
            this.storeCheckpoint.shutdown();
            this.perfs.shutdown();
            if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
                deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
                this.shutDownNormal = true;
            } else {
                LOGGER.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
            }
        }
        this.transientStorePool.destroy();
        if (this.lockFile == null || this.lock == null) {
            return;
        }
        try {
            this.lock.release();
            this.lockFile.close();
        } catch (IOException e2) {
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void destroy() {
        this.consumeQueueStore.destroy();
        this.commitLog.destroy();
        this.indexService.destroy();
        deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    }

    public long getMajorFileSize() {
        long j = 0;
        if (this.commitLog != null) {
            j = this.commitLog.getTotalSize();
        }
        long j2 = 0;
        if (this.consumeQueueStore != null) {
            j2 = this.consumeQueueStore.getTotalSize();
        }
        long j3 = 0;
        if (this.indexService != null) {
            j3 = this.indexService.getTotalSize();
        }
        return j + j2 + j3;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner messageExtBrokerInner) {
        Iterator<PutMessageHook> it = this.putMessageHookList.iterator();
        while (it.hasNext()) {
            PutMessageResult executeBeforePutMessage = it.next().executeBeforePutMessage(messageExtBrokerInner);
            if (executeBeforePutMessage != null) {
                return CompletableFuture.completedFuture(executeBeforePutMessage);
            }
        }
        if (messageExtBrokerInner.getProperties().containsKey("INNER_NUM") && !MessageSysFlag.check(messageExtBrokerInner.getSysFlag(), 128)) {
            LOGGER.warn("[BUG]The message had property {} but is not an inner batch", "INNER_NUM");
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
        if (MessageSysFlag.check(messageExtBrokerInner.getSysFlag(), 128) && !QueueTypeUtils.isBatchCq(getTopicConfig(messageExtBrokerInner.getTopic()))) {
            LOGGER.error("[BUG]The message is an inner batch but cq type is not batch cq");
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
        long now = getSystemClock().now();
        CompletableFuture<PutMessageResult> asyncPutMessage = this.commitLog.asyncPutMessage(messageExtBrokerInner);
        asyncPutMessage.thenAccept(putMessageResult -> {
            long now2 = getSystemClock().now() - now;
            if (now2 > 500) {
                LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost {}ms, topic={}, bodyLength={}", new Object[]{Long.valueOf(now2), messageExtBrokerInner.getTopic(), Integer.valueOf(messageExtBrokerInner.getBody().length)});
            }
            this.storeStatsService.setPutMessageEntireTimeMax(now2);
            if (null == putMessageResult || !putMessageResult.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().add(1L);
            }
        });
        return asyncPutMessage;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
        Iterator<PutMessageHook> it = this.putMessageHookList.iterator();
        while (it.hasNext()) {
            PutMessageResult executeBeforePutMessage = it.next().executeBeforePutMessage(messageExtBatch);
            if (executeBeforePutMessage != null) {
                return CompletableFuture.completedFuture(executeBeforePutMessage);
            }
        }
        long now = getSystemClock().now();
        CompletableFuture<PutMessageResult> asyncPutMessages = this.commitLog.asyncPutMessages(messageExtBatch);
        asyncPutMessages.thenAccept(putMessageResult -> {
            long now2 = getSystemClock().now() - now;
            if (now2 > 500) {
                LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", Long.valueOf(now2), Integer.valueOf(messageExtBatch.getBody().length));
            }
            this.storeStatsService.setPutMessageEntireTimeMax(now2);
            if (null == putMessageResult || !putMessageResult.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().add(1L);
            }
        });
        return asyncPutMessages;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public PutMessageResult putMessage(MessageExtBrokerInner messageExtBrokerInner) {
        return waitForPutResult(asyncPutMessage(messageExtBrokerInner));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        return waitForPutResult(asyncPutMessages(messageExtBatch));
    }

    private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> completableFuture) {
        try {
            return completableFuture.get(Math.max(this.messageStoreConfig.getSyncFlushTimeout(), this.messageStoreConfig.getSlaveTimeout()) + 5000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException e) {
            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
        } catch (TimeoutException e2) {
            LOGGER.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc process hangs or other unexpected situations.");
            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isOSPageCacheBusy() {
        long now = this.systemClock.now() - getCommitLog().getBeginTimeInLock();
        return now < 10000000 && now > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long lockTimeMills() {
        return this.commitLog.lockTimeMills();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMasterFlushedOffset() {
        return this.masterFlushedOffset;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setMasterFlushedOffset(long j) {
        this.masterFlushedOffset = j;
        this.storeCheckpoint.setMasterFlushedOffset(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getBrokerInitMaxOffset() {
        return this.brokerInitMaxOffset;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setBrokerInitMaxOffset(long j) {
        this.brokerInitMaxOffset = j;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SystemClock getSystemClock() {
        return this.systemClock;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CommitLog getCommitLog() {
        return this.commitLog;
    }

    public void truncateDirtyFiles(long j) throws RocksDBException {
        LOGGER.info("truncate dirty files to {}", Long.valueOf(j));
        if (j >= getMaxPhyOffset()) {
            LOGGER.info("no need to truncate files, truncate offset is {}, max physical offset is {}", Long.valueOf(j), Long.valueOf(getMaxPhyOffset()));
            return;
        }
        this.reputMessageService.shutdown();
        long reputFromOffset = this.reputMessageService.getReputFromOffset();
        truncateDirtyLogicFiles(j);
        this.commitLog.truncateDirtyFiles(j);
        recoverTopicQueueTable();
        if (this.messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
            this.reputMessageService = new ConcurrentReputMessageService();
        } else {
            this.reputMessageService = new ReputMessageService();
        }
        long min = Math.min(reputFromOffset, j);
        LOGGER.info("oldReputFromOffset is {}, reset reput from offset to {}", Long.valueOf(reputFromOffset), Long.valueOf(min));
        this.reputMessageService.setReputFromOffset(min);
        this.reputMessageService.start();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean truncateFiles(long j) throws RocksDBException {
        if (j >= getMaxPhyOffset()) {
            LOGGER.info("no need to truncate files, truncate offset is {}, max physical offset is {}", Long.valueOf(j), Long.valueOf(getMaxPhyOffset()));
            return true;
        }
        if (isOffsetAligned(j)) {
            truncateDirtyFiles(j);
            return true;
        }
        LOGGER.error("offset {} is not align, truncate failed, need manual fix", Long.valueOf(j));
        return false;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isOffsetAligned(long j) {
        SelectMappedBufferResult commitLogData = getCommitLogData(j);
        if (commitLogData == null) {
            return true;
        }
        return this.commitLog.checkMessageAndReturnSize(commitLogData.getByteBuffer(), true, false).isSuccess();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public GetMessageResult getMessage(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        return getMessage(str, str2, i, j, i2, MAX_PULL_MSG_SIZE, messageFilter);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        return CompletableFuture.completedFuture(getMessage(str, str2, i, j, i2, messageFilter));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public GetMessageResult getMessage(String str, String str2, int i, long j, int i2, int i3, MessageFilter messageFilter) {
        GetMessageStatus getMessageStatus;
        if (this.shutdown) {
            LOGGER.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }
        if (!this.runningFlags.isReadable()) {
            LOGGER.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }
        if (Objects.equals(CleanupPolicyUtils.getDeletePolicy(getTopicConfig(str2)), CleanupPolicy.COMPACTION) && this.messageStoreConfig.isEnableCompaction()) {
            return this.compactionStore.getMessage(str, str2, i, j, i2, i3);
        }
        long now = getSystemClock().now();
        GetMessageStatus getMessageStatus2 = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long j2 = j;
        long j3 = 0;
        long j4 = 0;
        GetMessageResult getMessageResult = new GetMessageResult();
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueueInterface findConsumeQueue = findConsumeQueue(str2, i);
        if (findConsumeQueue != null) {
            j3 = findConsumeQueue.getMinOffsetInQueue();
            j4 = findConsumeQueue.getMaxOffsetInQueue();
            if (j4 == 0) {
                getMessageStatus = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                j2 = nextOffsetCorrection(j, 0L);
            } else if (j < j3) {
                getMessageStatus = GetMessageStatus.OFFSET_TOO_SMALL;
                j2 = nextOffsetCorrection(j, j3);
            } else if (j == j4) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                j2 = nextOffsetCorrection(j, j);
            } else if (j > j4) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                j2 = nextOffsetCorrection(j, j4);
            } else {
                int max = Math.max(this.messageStoreConfig.getMaxFilterMessageSize(), i2 * findConsumeQueue.getUnitSize());
                boolean isDiskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                long max2 = Math.max(i3, 100);
                if (max2 > 134217728) {
                    LOGGER.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", new Object[]{Long.valueOf(max2), str2, Integer.valueOf(i)});
                    max2 = 134217728;
                }
                getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                long j5 = 0;
                int i4 = 0;
                while (true) {
                    if (getMessageResult.getBufferTotalSize() > 0 || j2 >= j4) {
                        break;
                    }
                    int i5 = i4;
                    i4++;
                    if (i5 >= this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
                        break;
                    }
                    ReferredIterator<CqUnit> referredIterator = null;
                    try {
                        try {
                            referredIterator = findConsumeQueue.iterateFrom(j2, i2);
                        } catch (RocksDBException e) {
                            ERROR_LOG.error("getMessage Failed. cid: {}, topic: {}, queueId: {}, offset: {}, minOffset: {}, maxOffset: {}, {}", new Object[]{str, str2, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j3), Long.valueOf(j4), e.getMessage()});
                            if (referredIterator != null) {
                                referredIterator.release();
                            }
                        }
                        if (referredIterator == null) {
                            getMessageStatus = GetMessageStatus.OFFSET_FOUND_NULL;
                            j2 = nextOffsetCorrection(j2, this.consumeQueueStore.rollNextFile(findConsumeQueue, j2));
                            LOGGER.warn("consumer request topic: " + str2 + ", offset: " + j + ", minOffset: " + j3 + ", maxOffset: " + j4 + ", but access logic queue failed. Correct nextBeginOffset to " + j2);
                            if (referredIterator != null) {
                                referredIterator.release();
                            }
                        } else {
                            long j6 = Long.MIN_VALUE;
                            while (referredIterator.hasNext() && j2 < j4) {
                                CqUnit next = referredIterator.next();
                                long pos = next.getPos();
                                int size = next.getSize();
                                boolean estimateInMemByCommitOffset = estimateInMemByCommitOffset(pos, maxOffset);
                                if ((next.getQueueOffset() - j) * findConsumeQueue.getUnitSize() <= max && !isTheBatchFull(size, next.getBatchNum(), i2, max2, getMessageResult.getBufferTotalSize(), getMessageResult.getMessageCount(), estimateInMemByCommitOffset) && getMessageResult.getBufferTotalSize() < max2) {
                                    j5 = pos;
                                    j2 = next.getQueueOffset() + next.getBatchNum();
                                    if (j6 == Long.MIN_VALUE || pos >= j6) {
                                        if (messageFilter == null || messageFilter.isMatchedByConsumeQueue(next.getValidTagsCodeAsLong(), next.getCqExtUnit())) {
                                            SelectMappedBufferResult message = this.commitLog.getMessage(pos, size);
                                            if (null == message) {
                                                if (getMessageResult.getBufferTotalSize() == 0) {
                                                    getMessageStatus = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                                }
                                                j6 = this.commitLog.rollNextFile(pos);
                                            } else {
                                                if (this.messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupForNoColdReadLimit(str) && !message.isInCache()) {
                                                    getMessageResult.setColdDataSum(getMessageResult.getColdDataSum() + size);
                                                }
                                                if (messageFilter == null || messageFilter.isMatchedByCommitLog(message.getByteBuffer().slice(), null)) {
                                                    this.storeStatsService.getGetMessageTransferredMsgCount().add(next.getBatchNum());
                                                    getMessageResult.addMessage(message, next.getQueueOffset(), next.getBatchNum());
                                                    getMessageStatus = GetMessageStatus.FOUND;
                                                    j6 = Long.MIN_VALUE;
                                                } else {
                                                    if (getMessageResult.getBufferTotalSize() == 0) {
                                                        getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                                                    }
                                                    message.release();
                                                }
                                            }
                                        } else if (getMessageResult.getBufferTotalSize() == 0) {
                                            getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                                        }
                                    }
                                }
                            }
                            if (referredIterator != null) {
                                referredIterator.release();
                            }
                        }
                    } catch (Throwable th) {
                        if (referredIterator != null) {
                            referredIterator.release();
                        }
                        throw th;
                    }
                }
                if (isDiskFallRecorded) {
                    this.brokerStatsManager.recordDiskFallBehindSize(str, str2, i, maxOffset - j5);
                }
                getMessageResult.setSuggestPullingFromSlave(maxOffset - j5 > ((long) (((double) StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d))));
            }
        } else {
            getMessageStatus = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            j2 = nextOffsetCorrection(j, 0L);
        }
        if (GetMessageStatus.FOUND == getMessageStatus) {
            this.storeStatsService.getGetMessageTimesTotalFound().add(1L);
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().add(1L);
        }
        this.storeStatsService.setGetMessageEntireTimeMax(getSystemClock().now() - now);
        if (getMessageResult == null) {
            getMessageResult = new GetMessageResult(0);
        }
        getMessageResult.setStatus(getMessageStatus);
        getMessageResult.setNextBeginOffset(j2);
        getMessageResult.setMaxOffset(j4);
        getMessageResult.setMinOffset(j3);
        return getMessageResult;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, int i, long j, int i2, int i3, MessageFilter messageFilter) {
        return CompletableFuture.completedFuture(getMessage(str, str2, i, j, i2, i3, messageFilter));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMaxOffsetInQueue(String str, int i) {
        return getMaxOffsetInQueue(str, i, true);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMaxOffsetInQueue(String str, int i, boolean z) {
        if (z) {
            ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
            if (consumeQueue != null) {
                return consumeQueue.getMaxOffsetInQueue();
            }
            return 0L;
        }
        Long maxOffset = this.consumeQueueStore.getMaxOffset(str, i);
        if (maxOffset != null) {
            return maxOffset.longValue();
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMinOffsetInQueue(String str, int i) {
        try {
            return this.consumeQueueStore.getMinOffsetInQueue(str, i);
        } catch (RocksDBException e) {
            ERROR_LOG.error("getMinOffsetInQueue Failed. topic: {}, queueId: {}", new Object[]{str, Integer.valueOf(i), e});
            return -1L;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public TimerMessageStore getTimerMessageStore() {
        return this.timerMessageStore;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
        this.timerMessageStore = timerMessageStore;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getCommitLogOffsetInQueue(String str, int i, long j) {
        CqUnit cqUnit;
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        if (consumeQueue == null || (cqUnit = consumeQueue.get(j)) == null) {
            return 0L;
        }
        return cqUnit.getPos();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getOffsetInQueueByTime(String str, int i, long j) {
        return getOffsetInQueueByTime(str, i, j, BoundaryType.LOWER);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getOffsetInQueueByTime(String str, int i, long j, BoundaryType boundaryType) {
        try {
            return this.consumeQueueStore.getOffsetInQueueByTime(str, i, j, boundaryType);
        } catch (RocksDBException e) {
            ERROR_LOG.error("getOffsetInQueueByTime Failed. topic: {}, queueId: {}, timestamp: {} boundaryType: {}, {}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j), boundaryType, e.getMessage()});
            return 0L;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public MessageExt lookMessageByOffset(long j) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            MessageExt lookMessageByOffset = lookMessageByOffset(j, message.getByteBuffer().getInt());
            message.release();
            return lookMessageByOffset;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult selectOneMessageByOffset(long j) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            SelectMappedBufferResult message2 = this.commitLog.getMessage(j, message.getByteBuffer().getInt());
            message.release();
            return message2;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult selectOneMessageByOffset(long j, int i) {
        return this.commitLog.getMessage(j, i);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public String getRunningDataInfo() {
        return this.storeStatsService.toString();
    }

    public String getStorePathPhysic() {
        return getMessageStoreConfig().isEnableDLegerCommitLog() ? ((DLedgerCommitLog) getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath() : getMessageStoreConfig().getStorePathCommitLog();
    }

    public String getStorePathLogic() {
        return StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
    }

    public MessageArrivingListener getMessageArrivingListener() {
        return this.messageArrivingListener;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public HashMap<String, String> getRuntimeInfo() {
        HashMap<String, String> runtimeInfo = this.storeStatsService.getRuntimeInfo();
        double d = Double.MAX_VALUE;
        for (String str : getStorePathPhysic().trim().split(MixAll.MULTI_PATH_SPLITTER)) {
            double diskPartitionSpaceUsedPercent = UtilAll.isPathExists(str) ? UtilAll.getDiskPartitionSpaceUsedPercent(str) : -1.0d;
            runtimeInfo.put(RunningStats.commitLogDiskRatio.name() + "_" + str, String.valueOf(diskPartitionSpaceUsedPercent));
            d = Math.min(d, diskPartitionSpaceUsedPercent);
        }
        runtimeInfo.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(d));
        runtimeInfo.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathLogic())));
        runtimeInfo.put(RunningStats.commitLogMinOffset.name(), String.valueOf(getMinPhyOffset()));
        runtimeInfo.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(getMaxPhyOffset()));
        return runtimeInfo;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMaxPhyOffset() {
        return this.commitLog.getMaxOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMinPhyOffset() {
        return this.commitLog.getMinOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getLastFileFromOffset() {
        return this.commitLog.getLastFileFromOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean getLastMappedFile(long j) {
        return this.commitLog.getLastMappedFile(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getEarliestMessageTime(String str, int i) {
        Pair<CqUnit, Long> earliestUnitAndStoreTime;
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        if (consumeQueue == null || (earliestUnitAndStoreTime = consumeQueue.getEarliestUnitAndStoreTime()) == null || earliestUnitAndStoreTime.getObject2() == null) {
            return -1L;
        }
        return ((Long) earliestUnitAndStoreTime.getObject2()).longValue();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<Long> getEarliestMessageTimeAsync(String str, int i) {
        return CompletableFuture.completedFuture(Long.valueOf(getEarliestMessageTime(str, i)));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getEarliestMessageTime() {
        long minPhyOffset = getMinPhyOffset();
        if (getCommitLog() instanceof DLedgerCommitLog) {
            minPhyOffset += 48;
        }
        int i = 64;
        if (InetAddressValidator.getInstance().isValidInet6Address(this.brokerConfig.getBrokerIP1())) {
            i = 76;
        }
        return getCommitLog().pickupStoreTimestamp(minPhyOffset, i);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMessageStoreTimeStamp(String str, int i, long j) {
        Pair<CqUnit, Long> cqUnitAndStoreTime;
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        if (consumeQueue == null || (cqUnitAndStoreTime = consumeQueue.getCqUnitAndStoreTime(j)) == null || cqUnitAndStoreTime.getObject2() == null) {
            return -1L;
        }
        return ((Long) cqUnitAndStoreTime.getObject2()).longValue();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String str, int i, long j) {
        return CompletableFuture.completedFuture(Long.valueOf(getMessageStoreTimeStamp(str, i, j)));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMessageTotalInQueue(String str, int i) {
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        if (consumeQueue != null) {
            return consumeQueue.getMessageTotalInQueue();
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult getCommitLogData(long j) {
        if (!this.shutdown) {
            return this.commitLog.getData(j);
        }
        LOGGER.warn("message store has shutdown, so getPhyQueueData is forbidden");
        return null;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public List<SelectMappedBufferResult> getBulkCommitLogData(long j, int i) {
        if (!this.shutdown) {
            return this.commitLog.getBulkData(j, i);
        }
        LOGGER.warn("message store has shutdown, so getBulkCommitLogData is forbidden");
        return null;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean appendToCommitLog(long j, byte[] bArr, int i, int i2) {
        if (this.shutdown) {
            LOGGER.warn("message store has shutdown, so appendToCommitLog is forbidden");
            return false;
        }
        boolean appendData = this.commitLog.appendData(j, bArr, i, i2);
        if (appendData) {
            this.reputMessageService.wakeup();
        } else {
            LOGGER.error("DefaultMessageStore#appendToCommitLog: failed to append data to commitLog, physical offset={}, data length={}", Long.valueOf(j), Integer.valueOf(bArr.length));
        }
        return appendData;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void executeDeleteFilesManually() {
        this.cleanCommitLogService.executeDeleteFilesManually();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public QueryMessageResult queryMessage(String str, String str2, int i, long j, long j2) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();
        long j3 = j2;
        for (int i2 = 0; i2 < 3; i2++) {
            QueryOffsetResult queryOffset = this.indexService.queryOffset(str, str2, i, j, j3);
            if (queryOffset.getPhyOffsets().isEmpty()) {
                break;
            }
            Collections.sort(queryOffset.getPhyOffsets());
            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffset.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffset.getIndexLastUpdateTimestamp());
            for (int i3 = 0; i3 < queryOffset.getPhyOffsets().size(); i3++) {
                long longValue = queryOffset.getPhyOffsets().get(i3).longValue();
                try {
                    MessageExt lookMessageByOffset = lookMessageByOffset(longValue);
                    if (0 == i3) {
                        j3 = lookMessageByOffset.getStoreTimestamp();
                    }
                    SelectMappedBufferResult data = this.commitLog.getData(longValue, false);
                    if (data != null) {
                        int i4 = data.getByteBuffer().getInt(0);
                        data.getByteBuffer().limit(i4);
                        data.setSize(i4);
                        queryMessageResult.addMessage(data);
                    }
                } catch (Exception e) {
                    LOGGER.error("queryMessage exception", e);
                }
            }
            if (queryMessageResult.getBufferTotalSize() > 0 || j3 < j) {
                break;
            }
        }
        return queryMessageResult;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public CompletableFuture<QueryMessageResult> queryMessageAsync(String str, String str2, int i, long j, long j2) {
        return CompletableFuture.completedFuture(queryMessage(str, str2, i, j, j2));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void updateHaMasterAddress(String str) {
        if (this.haService != null) {
            this.haService.updateHaMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void updateMasterAddress(String str) {
        if (this.haService != null) {
            this.haService.updateMasterAddress(str);
        }
        if (this.compactionService != null) {
            this.compactionService.updateMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setAliveReplicaNumInGroup(int i) {
        this.aliveReplicasNum = i;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void wakeupHAClient() {
        if (this.haService != null) {
            this.haService.getHAClient().wakeup();
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public int getAliveReplicaNumInGroup() {
        return this.aliveReplicasNum;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long slaveFallBehindMuch() {
        if (this.haService != null && !this.messageStoreConfig.isDuplicationEnable() && !this.messageStoreConfig.isEnableDLegerCommitLog()) {
            return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
        }
        LOGGER.warn("haServer is null or duplication is enable or enableDLegerCommitLog is true");
        return -1L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long now() {
        return this.systemClock.now();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public int deleteTopics(Set<String> set) {
        if (set == null || set.isEmpty()) {
            return 0;
        }
        int i = 0;
        for (String str : set) {
            ConcurrentMap<Integer, ConsumeQueueInterface> findConsumeQueueMap = this.consumeQueueStore.findConsumeQueueMap(str);
            if (findConsumeQueueMap != null && !findConsumeQueueMap.isEmpty()) {
                for (ConsumeQueueInterface consumeQueueInterface : findConsumeQueueMap.values()) {
                    try {
                        this.consumeQueueStore.destroy(consumeQueueInterface);
                    } catch (RocksDBException e) {
                        LOGGER.error("DeleteTopic: ConsumeQueue cleans error!, topic={}, queueId={}", new Object[]{consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), e});
                    }
                    LOGGER.info("DeleteTopic: ConsumeQueue has been cleaned, topic={}, queueId={}", consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()));
                    this.consumeQueueStore.removeTopicQueueTable(consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()));
                }
                this.consumeQueueStore.getConsumeQueueTable().remove(str);
                if (this.brokerConfig.isAutoDeleteUnusedStats()) {
                    this.brokerStatsManager.onTopicDeleted(str);
                }
                String str2 = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()) + File.separator + str;
                String str3 = StorePathConfigHelper.getStorePathConsumeQueueExt(this.messageStoreConfig.getStorePathRootDir()) + File.separator + str;
                String str4 = StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()) + File.separator + str;
                UtilAll.deleteEmptyDirectory(new File(str2));
                UtilAll.deleteEmptyDirectory(new File(str3));
                UtilAll.deleteEmptyDirectory(new File(str4));
                LOGGER.info("DeleteTopic: Topic has been destroyed, topic={}", str);
                i++;
            }
        }
        return i;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public int cleanUnusedTopic(Set<String> set) {
        int i = 0;
        UnmodifiableIterator it = Sets.difference(getConsumeQueueTable().keySet(), set).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (!set.contains(str) && !TopicValidator.isSystemTopic(str) && !MixAll.isLmq(str)) {
                i += deleteTopics(Sets.newHashSet(new String[]{str}));
            }
        }
        return i;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void cleanExpiredConsumerQueue() {
        this.consumeQueueStore.cleanExpired(this.commitLog.getMinOffset());
    }

    public Map<String, Long> getMessageIds(String str, int i, long j, long j2, SocketAddress socketAddress) {
        HashMap hashMap = new HashMap();
        if (this.shutdown) {
            return hashMap;
        }
        ConsumeQueueInterface findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            long max = Math.max(j, findConsumeQueue.getMinOffsetInQueue());
            long min = Math.min(j2, findConsumeQueue.getMaxOffsetInQueue());
            if (min == 0) {
                return hashMap;
            }
            long j3 = max;
            while (j3 < min) {
                ReferredIterator<CqUnit> iterateFrom = findConsumeQueue.iterateFrom(j3);
                if (iterateFrom != null) {
                    try {
                        if (iterateFrom.hasNext()) {
                            while (iterateFrom.hasNext()) {
                                CqUnit next = iterateFrom.next();
                                hashMap.put(MessageDecoder.createMessageId(ByteBuffer.allocate(((InetSocketAddress) socketAddress).getAddress() instanceof Inet6Address ? 28 : 16), MessageExt.socketAddress2ByteBuffer(socketAddress), next.getPos()), Long.valueOf(next.getQueueOffset()));
                                j3 = next.getQueueOffset() + next.getBatchNum();
                                if (j3 >= min) {
                                    return hashMap;
                                }
                            }
                            if (iterateFrom != null) {
                                iterateFrom.release();
                            }
                        }
                    } finally {
                        if (iterateFrom != null) {
                            iterateFrom.release();
                        }
                    }
                }
                if (iterateFrom != null) {
                    iterateFrom.release();
                }
                return hashMap;
            }
        }
        return hashMap;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    @Deprecated
    public boolean checkInDiskByConsumeOffset(String str, int i, long j) {
        CqUnit cqUnit;
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        return (consumeQueue == null || (cqUnit = consumeQueue.get(j)) == null || estimateInMemByCommitOffset(cqUnit.getPos(), maxOffset)) ? false : true;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean checkInMemByConsumeOffset(String str, int i, long j, int i2) {
        CqUnit cqUnit;
        CqUnit cqUnit2;
        ConsumeQueueInterface consumeQueue = getConsumeQueue(str, i);
        if (consumeQueue == null || (cqUnit = consumeQueue.get(j)) == null) {
            return false;
        }
        long pos = cqUnit.getPos();
        if (i2 > 1 && (cqUnit2 = consumeQueue.get(j + i2)) != null) {
            return checkInMemByCommitOffset(pos, ((int) (cqUnit2.getPos() - pos)) + cqUnit2.getSize());
        }
        return checkInMemByCommitOffset(pos, cqUnit.getSize());
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean checkInStoreByConsumeOffset(String str, int i, long j) {
        return checkInDiskByCommitOffset(getCommitLogOffsetInQueue(str, i, j));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long dispatchBehindBytes() {
        return this.reputMessageService.behind();
    }

    public long flushBehindBytes() {
        return this.messageStoreConfig.isTransientStorePoolEnable() ? this.commitLog.remainHowManyDataToCommit() + this.commitLog.remainHowManyDataToFlush() : this.commitLog.remainHowManyDataToFlush();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long flush() {
        return this.commitLog.flush();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getFlushedWhere() {
        return this.commitLog.getFlushedWhere();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean resetWriteOffset(long j) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(this.consumeQueueStore.getTopicQueueTable());
        long j2 = j == -1 ? 0L : j;
        while (true) {
            SelectMappedBufferResult selectOneMessageByOffset = selectOneMessageByOffset(j2);
            if (selectOneMessageByOffset == null) {
                break;
            }
            try {
                try {
                    if (selectOneMessageByOffset.getStartOffset() > j2) {
                        j2 = selectOneMessageByOffset.getStartOffset();
                        if (selectOneMessageByOffset != null) {
                            selectOneMessageByOffset.release();
                        }
                    } else {
                        ByteBuffer byteBuffer = selectOneMessageByOffset.getByteBuffer();
                        int i = byteBuffer.getInt(byteBuffer.position() + 4);
                        if (i == -875286124) {
                            j2 += byteBuffer.getInt(byteBuffer.position());
                            if (selectOneMessageByOffset != null) {
                                selectOneMessageByOffset.release();
                            }
                        } else {
                            if (i != -626843481) {
                                throw new RuntimeException("Unknown magicCode: " + i);
                                break;
                            }
                            selectOneMessageByOffset.getByteBuffer().mark();
                            if (checkMessageAndReturnSize(selectOneMessageByOffset.getByteBuffer(), true, this.messageStoreConfig.isDuplicationEnable(), true).isSuccess()) {
                                selectOneMessageByOffset.getByteBuffer().reset();
                                MessageExt decode = MessageDecoder.decode(selectOneMessageByOffset.getByteBuffer(), true, false, false, false, true);
                                if (decode != null) {
                                    String str = decode.getTopic() + "-" + decode.getQueueId();
                                    Long l = (Long) concurrentHashMap.get(str);
                                    if (l != null && l.longValue() > decode.getQueueOffset()) {
                                        concurrentHashMap.put(str, Long.valueOf(decode.getQueueOffset()));
                                    }
                                    j2 += decode.getStoreSize();
                                    if (selectOneMessageByOffset != null) {
                                        selectOneMessageByOffset.release();
                                    }
                                } else if (selectOneMessageByOffset != null) {
                                    selectOneMessageByOffset.release();
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    LOGGER.error("resetWriteOffset error.", th);
                    if (selectOneMessageByOffset != null) {
                        selectOneMessageByOffset.release();
                    }
                }
            } finally {
                if (selectOneMessageByOffset != null) {
                    selectOneMessageByOffset.release();
                }
            }
        }
        if (!this.commitLog.resetOffset(j)) {
            return false;
        }
        this.consumeQueueStore.setTopicQueueTable(concurrentHashMap);
        return true;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getConfirmOffset() {
        return this.commitLog.getConfirmOffset();
    }

    public long getConfirmOffsetDirectly() {
        return this.commitLog.getConfirmOffsetDirectly();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setConfirmOffset(long j) {
        this.commitLog.setConfirmOffset(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public byte[] calcDeltaChecksum(long j, long j2) {
        if (j < 0 || j2 <= j) {
            return new byte[0];
        }
        int i = (int) (j2 - j);
        if (i > this.messageStoreConfig.getMaxChecksumRange()) {
            LOGGER.error("Checksum range from {}, size {} exceeds threshold {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(this.messageStoreConfig.getMaxChecksumRange())});
            return null;
        }
        ArrayList arrayList = new ArrayList();
        List<SelectMappedBufferResult> bulkCommitLogData = getBulkCommitLogData(j, i);
        if (bulkCommitLogData.isEmpty()) {
            return new byte[0];
        }
        for (SelectMappedBufferResult selectMappedBufferResult : bulkCommitLogData) {
            arrayList.addAll(MessageDecoder.decodesBatch(selectMappedBufferResult.getByteBuffer(), true, false, false));
            selectMappedBufferResult.release();
        }
        if (arrayList.isEmpty()) {
            return new byte[0];
        }
        ByteBuffer allocate = ByteBuffer.allocate(i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                allocate.put(MessageDecoder.encodeUniquely((MessageExt) it.next(), false));
            } catch (IOException e) {
            }
        }
        return Hashing.murmur3_128().hashBytes(allocate.array()).asBytes();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setPhysicalOffset(long j) {
        this.commitLog.setMappedFileQueueOffset(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isMappedFilesEmpty() {
        return this.commitLog.isMappedFilesEmpty();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public MessageExt lookMessageByOffset(long j, int i) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, i);
        if (null == message) {
            return null;
        }
        try {
            MessageExt decode = MessageDecoder.decode(message.getByteBuffer(), true, false);
            message.release();
            return decode;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public ConsumeQueueInterface findConsumeQueue(String str, int i) {
        return this.consumeQueueStore.findOrCreateConsumeQueue(str, i);
    }

    private long nextOffsetCorrection(long j, long j2) {
        long j3 = j;
        if (getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || getMessageStoreConfig().isOffsetCheckInSlave()) {
            j3 = j2;
        }
        return j3;
    }

    private boolean estimateInMemByCommitOffset(long j, long j2) {
        return j2 - j <= ((long) (((double) StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d)));
    }

    private boolean checkInMemByCommitOffset(long j, int i) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, i);
        if (message == null) {
            return false;
        }
        try {
            boolean isInMem = message.isInMem();
            message.release();
            return isInMem;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    public boolean checkInDiskByCommitOffset(long j) {
        return j >= this.commitLog.getMinOffset();
    }

    public boolean checkInColdAreaByCommitOffset(long j, long j2) {
        return j2 - j > ((long) (((double) StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE) * (((double) this.messageStoreConfig.getAccessMessageInMemoryHotRatio()) / 100.0d)));
    }

    private boolean isTheBatchFull(int i, int i2, int i3, long j, int i4, int i5, boolean z) {
        if (0 == i4 || 0 == i5) {
            return false;
        }
        if (i5 + i2 <= i3 && i4 + i <= j) {
            return z ? i4 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory() || i5 > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1 : i4 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk() || i5 > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1;
        }
        return true;
    }

    private void deleteFile(String str) {
        LOGGER.info(str + (new File(str).delete() ? " delete OK" : " delete Failed"));
    }

    private void createTempFile() throws IOException {
        String abortFile = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(abortFile);
        UtilAll.ensureDirOK(file.getParent());
        LOGGER.info(abortFile + (file.createNewFile() ? " create OK" : " already exists"));
        MixAll.string2File(Long.toString(MixAll.getPID()), file.getAbsolutePath());
    }

    private void addScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(getBrokerIdentity()) { // from class: org.apache.rocketmq.store.DefaultMessageStore.1
            public void run0() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, BrokerStatsManager.ACCOUNT_STAT_INVERTAL, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(getBrokerIdentity()) { // from class: org.apache.rocketmq.store.DefaultMessageStore.2
            public void run0() {
                DefaultMessageStore.this.checkSelf();
            }
        }, 1L, 10L, TimeUnit.MINUTES);
        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(getBrokerIdentity()) { // from class: org.apache.rocketmq.store.DefaultMessageStore.3
            public void run0() {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
                    try {
                        if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
                            long currentTimeMillis = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
                            if (currentTimeMillis > 1000 && currentTimeMillis < 10000000) {
                                MixAll.string2FileNotSafe(UtilAll.jstack(), System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + currentTimeMillis);
                            }
                        }
                    } catch (Exception e) {
                    }
                }
            }
        }, 1L, 1L, TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(getBrokerIdentity()) { // from class: org.apache.rocketmq.store.DefaultMessageStore.4
            public void run0() {
                DefaultMessageStore.this.storeCheckpoint.flush();
            }
        }, 1L, 1L, TimeUnit.SECONDS);
        this.scheduledCleanQueueExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.5
            @Override // java.lang.Runnable
            public void run() {
                DefaultMessageStore.this.cleanQueueFilesPeriodically();
            }
        }, BrokerStatsManager.ACCOUNT_STAT_INVERTAL, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanQueueFilesPeriodically() {
        this.correctLogicOffsetService.run();
        this.cleanConsumeQueueService.run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkSelf() {
        this.commitLog.checkSelf();
        this.consumeQueueStore.checkSelf();
    }

    private boolean isTempFileExist() {
        return new File(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())).exists();
    }

    private boolean isRecoverConcurrently() {
        return this.brokerConfig.isRecoverConcurrently() && !this.messageStoreConfig.isEnableRocksDBStore();
    }

    private void recover(boolean z) throws RocksDBException {
        LOGGER.info("message store recover mode: {}", isRecoverConcurrently() ? "concurrent" : "normal");
        long currentTimeMillis = System.currentTimeMillis();
        recoverConsumeQueue();
        long maxPhyOffsetInConsumeQueue = this.consumeQueueStore.getMaxPhyOffsetInConsumeQueue();
        long currentTimeMillis2 = System.currentTimeMillis();
        if (z) {
            this.commitLog.recoverNormally(maxPhyOffsetInConsumeQueue);
        } else {
            this.commitLog.recoverAbnormally(maxPhyOffsetInConsumeQueue);
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        recoverTopicQueueTable();
        long currentTimeMillis4 = System.currentTimeMillis();
        LOGGER.info("message store recover total cost: {} ms, recoverConsumeQueue: {} ms, recoverCommitLog: {} ms, recoverOffsetTable: {} ms", new Object[]{Long.valueOf(currentTimeMillis4 - currentTimeMillis), Long.valueOf(currentTimeMillis2 - currentTimeMillis), Long.valueOf(currentTimeMillis3 - currentTimeMillis2), Long.valueOf(currentTimeMillis4 - currentTimeMillis3)});
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getTimingMessageCount(String str) {
        if (null == this.timerMessageStore) {
            return 0L;
        }
        return this.timerMessageStore.getTimerMetrics().getTimingCount(str);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void finishCommitLogDispatch() {
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public TransientStorePool getTransientStorePool() {
        return this.transientStorePool;
    }

    private void recoverConsumeQueue() {
        if (isRecoverConcurrently()) {
            this.consumeQueueStore.recoverConcurrently();
        } else {
            this.consumeQueueStore.recover();
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void recoverTopicQueueTable() {
        this.consumeQueueStore.recoverOffsetTable(this.commitLog.getMinOffset());
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public AllocateMappedFileService getAllocateMappedFileService() {
        return this.allocateMappedFileService;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public StoreStatsService getStoreStatsService() {
        return this.storeStatsService;
    }

    public RunningFlags getAccessRights() {
        return this.runningFlags;
    }

    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
        return this.consumeQueueStore.getConsumeQueueTable();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public StoreCheckpoint getStoreCheckpoint() {
        return this.storeCheckpoint;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public HAService getHaService() {
        return this.haService;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public RunningFlags getRunningFlags() {
        return this.runningFlags;
    }

    public void doDispatch(DispatchRequest dispatchRequest) throws RocksDBException {
        Iterator<CommitLogDispatcher> it = this.dispatcherList.iterator();
        while (it.hasNext()) {
            it.next().dispatch(dispatchRequest);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void putMessagePositionInfo(DispatchRequest dispatchRequest) throws RocksDBException {
        this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean z, boolean z2, boolean z3) {
        return this.commitLog.checkMessageAndReturnSize(byteBuffer, z, z2, z3);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getStateMachineVersion() {
        return this.stateMachineVersion;
    }

    public void setStateMachineVersion(long j) {
        this.stateMachineVersion = j;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }

    public BrokerConfig getBrokerConfig() {
        return this.brokerConfig;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public int remainTransientStoreBufferNumbs() {
        if (isTransientStorePoolEnable()) {
            return this.transientStorePool.availableBufferNums();
        }
        return Integer.MAX_VALUE;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isTransientStorePoolDeficient() {
        return remainTransientStoreBufferNumbs() == 0;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long remainHowManyDataToCommit() {
        return this.commitLog.remainHowManyDataToCommit();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long remainHowManyDataToFlush() {
        return this.commitLog.remainHowManyDataToFlush();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public LinkedList<CommitLogDispatcher> getDispatcherList() {
        return this.dispatcherList;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void addDispatcher(CommitLogDispatcher commitLogDispatcher) {
        this.dispatcherList.add(commitLogDispatcher);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setMasterStoreInProcess(MessageStore messageStore) {
        this.masterStoreInProcess = messageStore;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public MessageStore getMasterStoreInProcess() {
        return this.masterStoreInProcess;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean getData(long j, int i, ByteBuffer byteBuffer) {
        return this.commitLog.getData(j, i, byteBuffer);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public ConsumeQueueInterface getConsumeQueue(String str, int i) {
        ConcurrentMap<Integer, ConsumeQueueInterface> concurrentMap = getConsumeQueueTable().get(str);
        if (concurrentMap == null) {
            return null;
        }
        return concurrentMap.get(Integer.valueOf(i));
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void unlockMappedFile(final MappedFile mappedFile) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.6
            @Override // java.lang.Runnable
            public void run() {
                mappedFile.munlock();
            }
        }, 6L, TimeUnit.SECONDS);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public PerfCounter.Ticks getPerfCounter() {
        return this.perfs;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public ConsumeQueueStoreInterface getQueueStore() {
        return this.consumeQueueStore;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void onCommitLogAppend(MessageExtBrokerInner messageExtBrokerInner, AppendMessageResult appendMessageResult, MappedFile mappedFile) {
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean z, MappedFile mappedFile, boolean z2, boolean z3) throws RocksDBException {
        if (!z || z3) {
            return;
        }
        doDispatch(dispatchRequest);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isSyncDiskFlush() {
        return FlushDiskType.SYNC_FLUSH == getMessageStoreConfig().getFlushDiskType();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isSyncMaster() {
        return BrokerRole.SYNC_MASTER == getMessageStoreConfig().getBrokerRole();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void assignOffset(MessageExtBrokerInner messageExtBrokerInner) throws RocksDBException {
        int transactionValue = MessageSysFlag.getTransactionValue(messageExtBrokerInner.getSysFlag());
        if (transactionValue == 0 || transactionValue == 8) {
            this.consumeQueueStore.assignQueueOffset(messageExtBrokerInner);
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void increaseOffset(MessageExtBrokerInner messageExtBrokerInner, short s) {
        int transactionValue = MessageSysFlag.getTransactionValue(messageExtBrokerInner.getSysFlag());
        if (transactionValue == 0 || transactionValue == 8) {
            this.consumeQueueStore.increaseQueueOffset(messageExtBrokerInner, s);
        }
    }

    public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
        return this.topicConfigTable;
    }

    public Optional<TopicConfig> getTopicConfig(String str) {
        return this.topicConfigTable == null ? Optional.empty() : Optional.ofNullable(this.topicConfigTable.get(str));
    }

    public BrokerIdentity getBrokerIdentity() {
        return this.messageStoreConfig.isEnableDLegerCommitLog() ? new BrokerIdentity(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), Integer.parseInt(this.messageStoreConfig.getdLegerSelfId().substring(1)), this.brokerConfig.isInBrokerContainer()) : new BrokerIdentity(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.isInBrokerContainer());
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
        if (!this.brokerConfig.isLongPollingEnable() || this.messageArrivingListener == null) {
            return;
        }
        this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
        this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public HARuntimeInfo getHARuntimeInfo() {
        if (this.haService != null) {
            return this.haService.getRuntimeInfo(this.commitLog.getMaxOffset());
        }
        return null;
    }

    public void enableRocksdbCQWrite() {
        try {
            RocksDBMessageStore rocksDBMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, this.topicConfigTable);
            this.rocksDBMessageStore = rocksDBMessageStore;
            rocksDBMessageStore.loadAndStartConsumerServiceOnly();
            addDispatcher(rocksDBMessageStore.getDispatcherBuildRocksdbConsumeQueue());
        } catch (Exception e) {
            LOGGER.error("enableRocksdbCqWrite error", e);
        }
    }

    public int getMaxDelayLevel() {
        return this.maxDelayLevel;
    }

    public long computeDeliverTimestamp(int i, long j) {
        Long l = this.delayLevelTable.get(Integer.valueOf(i));
        return l != null ? l.longValue() + j : j + 1000;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public List<PutMessageHook> getPutMessageHookList() {
        return this.putMessageHookList;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setSendMessageBackHook(SendMessageBackHook sendMessageBackHook) {
        this.sendMessageBackHook = sendMessageBackHook;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SendMessageBackHook getSendMessageBackHook() {
        return this.sendMessageBackHook;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long estimateMessageCount(String str, int i, long j, long j2, MessageFilter messageFilter) {
        if (j < 0) {
            j = 0;
        }
        if (j >= j2) {
            return 0L;
        }
        if (null == messageFilter) {
            return j2 - j;
        }
        ConsumeQueueInterface findConsumeQueue = findConsumeQueue(str, i);
        if (null == findConsumeQueue) {
            return 0L;
        }
        long minOffsetInQueue = findConsumeQueue.getMinOffsetInQueue();
        if (j < minOffsetInQueue) {
            long j3 = j2 - j;
            j = minOffsetInQueue;
            j2 = j + j3;
        }
        long estimateMessageCount = findConsumeQueue.estimateMessageCount(j, j2, messageFilter);
        return estimateMessageCount == -1 ? j2 - j : estimateMessageCount;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
        return DefaultStoreMetricsManager.getMetricsView();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void initMetrics(Meter meter, Supplier<AttributesBuilder> supplier) {
        DefaultStoreMetricsManager.init(meter, supplier, this);
    }

    public boolean isTransientStorePoolEnable() {
        return this.messageStoreConfig.isTransientStorePoolEnable() && (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
    }

    public long getReputFromOffset() {
        return this.reputMessageService.getReputFromOffset();
    }

    public RocksDBMessageStore getRocksDBMessageStore() {
        return this.rocksDBMessageStore;
    }

    public ConsumeQueueStoreInterface getConsumeQueueStore() {
        return this.consumeQueueStore;
    }
}
