package org.apache.rocketmq.store.queue;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/store/queue/ConsumeQueueStore.class */
public class ConsumeQueueStore {
    private static final Logger log = LoggerFactory.getLogger("RocketmqStore");
    protected final DefaultMessageStore messageStore;
    protected final MessageStoreConfig messageStoreConfig;
    protected final QueueOffsetOperator queueOffsetOperator = new QueueOffsetOperator();
    protected final ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> consumeQueueTable = new ConcurrentHashMap(32);

    public ConsumeQueueStore(DefaultMessageStore defaultMessageStore, MessageStoreConfig messageStoreConfig) {
        this.messageStore = defaultMessageStore;
        this.messageStoreConfig = messageStoreConfig;
    }

    private FileQueueLifeCycle getLifeCycle(String str, int i) {
        return findOrCreateConsumeQueue(str, i);
    }

    public long rollNextFile(ConsumeQueueInterface consumeQueueInterface, long j) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).rollNextFile(j);
    }

    public void correctMinOffset(ConsumeQueueInterface consumeQueueInterface, long j) {
        consumeQueueInterface.correctMinOffset(j);
    }

    public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueueInterface, DispatchRequest dispatchRequest) {
        consumeQueueInterface.putMessagePositionInfoWrapper(dispatchRequest);
    }

    public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {
        putMessagePositionInfoWrapper(findOrCreateConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()), dispatchRequest);
    }

    public boolean load(ConsumeQueueInterface consumeQueueInterface) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).load();
    }

    public boolean load() {
        return loadConsumeQueues(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.SimpleCQ) && loadConsumeQueues(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.BatchCQ);
    }

    private boolean loadConsumeQueues(String str, CQType cQType) {
        ConsumeQueueInterface createConsumeQueueByType;
        File[] listFiles = new File(str).listFiles();
        if (listFiles != null) {
            for (File file : listFiles) {
                String name = file.getName();
                File[] listFiles2 = file.listFiles();
                if (listFiles2 != null) {
                    for (File file2 : listFiles2) {
                        try {
                            int parseInt = Integer.parseInt(file2.getName());
                            queueTypeShouldBe(name, cQType);
                            createConsumeQueueByType = createConsumeQueueByType(cQType, name, parseInt, str);
                            putConsumeQueue(name, parseInt, createConsumeQueueByType);
                        } catch (NumberFormatException e) {
                        }
                        if (!load(createConsumeQueueByType)) {
                            return false;
                        }
                    }
                }
            }
        }
        log.info("load {} all over, OK", cQType);
        return true;
    }

    private ConsumeQueueInterface createConsumeQueueByType(CQType cQType, String str, int i, String str2) {
        if (Objects.equals(CQType.SimpleCQ, cQType)) {
            return new ConsumeQueue(str, i, str2, this.messageStoreConfig.getMappedFileSizeConsumeQueue(), this.messageStore);
        }
        if (Objects.equals(CQType.BatchCQ, cQType)) {
            return new BatchConsumeQueue(str, i, str2, this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), this.messageStore);
        }
        throw new RuntimeException(String.format("queue type %s is not supported.", cQType.toString()));
    }

    private void queueTypeShouldBe(String str, CQType cQType) {
        CQType cQType2 = QueueTypeUtils.getCQType(this.messageStore.getTopicConfig(str));
        if (!Objects.equals(cQType, cQType2)) {
            throw new RuntimeException(String.format("The queue type of topic: %s should be %s, but is %s", str, cQType, cQType2));
        }
    }

    private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String str) {
        return new ThreadPoolExecutor(this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), BrokerStatsManager.ACCOUNT_STAT_INVERTAL, TimeUnit.MILLISECONDS, blockingQueue, (ThreadFactory) new ThreadFactoryImpl(str));
    }

    public void recover(ConsumeQueueInterface consumeQueueInterface) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).recover();
    }

    public void recover() {
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueueInterface> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                recover(it2.next());
            }
        }
    }

    public boolean recoverConcurrently() {
        int i = 0;
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            i += it.next().values().size();
        }
        CountDownLatch countDownLatch = new CountDownLatch(i);
        ExecutorService buildExecutorService = buildExecutorService(new LinkedBlockingQueue(), "RecoverConsumeQueueThread_");
        ArrayList<FutureTask> arrayList = new ArrayList(i);
        try {
            try {
                Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it2 = this.consumeQueueTable.values().iterator();
                while (it2.hasNext()) {
                    for (ConsumeQueueInterface consumeQueueInterface : it2.next().values()) {
                        FutureTask futureTask = new FutureTask(() -> {
                            boolean z = true;
                            try {
                                try {
                                    consumeQueueInterface.recover();
                                    countDownLatch.countDown();
                                } catch (Throwable th) {
                                    z = false;
                                    log.error("Exception occurs while recover consume queue concurrently, topic={}, queueId={}", new Object[]{consumeQueueInterface.getTopic(), Integer.valueOf(consumeQueueInterface.getQueueId()), th});
                                    countDownLatch.countDown();
                                }
                                return Boolean.valueOf(z);
                            } catch (Throwable th2) {
                                countDownLatch.countDown();
                                throw th2;
                            }
                        });
                        arrayList.add(futureTask);
                        buildExecutorService.submit(futureTask);
                    }
                }
                countDownLatch.await();
                for (FutureTask futureTask2 : arrayList) {
                    if (futureTask2 != null && futureTask2.isDone() && !((Boolean) futureTask2.get()).booleanValue()) {
                        buildExecutorService.shutdown();
                        return false;
                    }
                }
                buildExecutorService.shutdown();
                return true;
            } catch (Exception e) {
                log.error("Exception occurs while recover consume queue concurrently", e);
                buildExecutorService.shutdown();
                return false;
            }
        } catch (Throwable th) {
            buildExecutorService.shutdown();
            throw th;
        }
    }

    public long getMaxOffsetInConsumeQueue() {
        long j = -1;
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            for (ConsumeQueueInterface consumeQueueInterface : it.next().values()) {
                if (consumeQueueInterface.getMaxPhysicOffset() > j) {
                    j = consumeQueueInterface.getMaxPhysicOffset();
                }
            }
        }
        return j;
    }

    public void checkSelf(ConsumeQueueInterface consumeQueueInterface) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).checkSelf();
    }

    public void checkSelf() {
        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<Integer, ConsumeQueueInterface>> it2 = it.next().getValue().entrySet().iterator();
            while (it2.hasNext()) {
                checkSelf(it2.next().getValue());
            }
        }
    }

    public boolean flush(ConsumeQueueInterface consumeQueueInterface, int i) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).flush(i);
    }

    public void destroy(ConsumeQueueInterface consumeQueueInterface) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).destroy();
    }

    public int deleteExpiredFile(ConsumeQueueInterface consumeQueueInterface, long j) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).deleteExpiredFile(j);
    }

    public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueueInterface, long j) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).truncateDirtyLogicFiles(j);
    }

    public void swapMap(ConsumeQueueInterface consumeQueueInterface, int i, long j, long j2) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).swapMap(i, j, j2);
    }

    public void cleanSwappedMap(ConsumeQueueInterface consumeQueueInterface, long j) {
        getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).cleanSwappedMap(j);
    }

    public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueueInterface) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).isFirstFileAvailable();
    }

    public boolean isFirstFileExist(ConsumeQueueInterface consumeQueueInterface) {
        return getLifeCycle(consumeQueueInterface.getTopic(), consumeQueueInterface.getQueueId()).isFirstFileExist();
    }

    public ConsumeQueueInterface findOrCreateConsumeQueue(String str, int i) {
        return doFindOrCreateConsumeQueue(str, i);
    }

    private ConsumeQueueInterface doFindOrCreateConsumeQueue(String str, int i) {
        ConcurrentMap<Integer, ConsumeQueueInterface> concurrentMap = this.consumeQueueTable.get(str);
        if (null == concurrentMap) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(128);
            ConcurrentMap<Integer, ConsumeQueueInterface> putIfAbsent = this.consumeQueueTable.putIfAbsent(str, concurrentHashMap);
            concurrentMap = putIfAbsent != null ? putIfAbsent : concurrentHashMap;
        }
        ConsumeQueueInterface consumeQueueInterface = concurrentMap.get(Integer.valueOf(i));
        if (consumeQueueInterface != null) {
            return consumeQueueInterface;
        }
        ConsumeQueueInterface batchConsumeQueue = Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(this.messageStore.getTopicConfig(str))) ? new BatchConsumeQueue(str, i, StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), this.messageStore) : new ConsumeQueue(str, i, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.messageStoreConfig.getMappedFileSizeConsumeQueue(), this.messageStore);
        ConsumeQueueInterface putIfAbsent2 = concurrentMap.putIfAbsent(Integer.valueOf(i), batchConsumeQueue);
        return putIfAbsent2 != null ? putIfAbsent2 : batchConsumeQueue;
    }

    public Long getMaxOffset(String str, int i) {
        return Long.valueOf(this.queueOffsetOperator.currentQueueOffset(str + "-" + i));
    }

    public void setTopicQueueTable(ConcurrentMap<String, Long> concurrentMap) {
        this.queueOffsetOperator.setTopicQueueTable(concurrentMap);
        this.queueOffsetOperator.setLmqTopicQueueTable(concurrentMap);
    }

    public ConcurrentMap getTopicQueueTable() {
        return this.queueOffsetOperator.getTopicQueueTable();
    }

    public void setBatchTopicQueueTable(ConcurrentMap<String, Long> concurrentMap) {
        this.queueOffsetOperator.setBatchTopicQueueTable(concurrentMap);
    }

    public void assignQueueOffset(MessageExtBrokerInner messageExtBrokerInner) {
        findOrCreateConsumeQueue(messageExtBrokerInner.getTopic(), messageExtBrokerInner.getQueueId()).assignQueueOffset(this.queueOffsetOperator, messageExtBrokerInner);
    }

    public void increaseQueueOffset(MessageExtBrokerInner messageExtBrokerInner, short s) {
        findOrCreateConsumeQueue(messageExtBrokerInner.getTopic(), messageExtBrokerInner.getQueueId()).increaseQueueOffset(this.queueOffsetOperator, messageExtBrokerInner, s);
    }

    public void updateQueueOffset(String str, int i, long j) {
        this.queueOffsetOperator.updateQueueOffset(str + "-" + i, j);
    }

    public void removeTopicQueueTable(String str, Integer num) {
        this.queueOffsetOperator.remove(str, num);
    }

    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
        return this.consumeQueueTable;
    }

    private void putConsumeQueue(String str, int i, ConsumeQueueInterface consumeQueueInterface) {
        ConcurrentMap<Integer, ConsumeQueueInterface> concurrentMap = this.consumeQueueTable.get(str);
        if (null != concurrentMap) {
            concurrentMap.put(Integer.valueOf(i), consumeQueueInterface);
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(Integer.valueOf(i), consumeQueueInterface);
        this.consumeQueueTable.put(str, concurrentHashMap);
    }

    /* JADX WARN: Code restructure failed: missing block: B:54:0x0183, code lost:
    
        throw new java.lang.RuntimeException("Unknown magicCode: " + r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void recoverOffsetTable(long r8) {
        /*
            Method dump skipped, instructions count: 586
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.queue.ConsumeQueueStore.recoverOffsetTable(long):void");
    }

    public void destroy() {
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueueInterface> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                destroy(it2.next());
            }
        }
    }

    public void cleanExpired(long j) {
        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
            String key = next.getKey();
            if (!TopicValidator.isSystemTopic(key)) {
                ConcurrentMap<Integer, ConsumeQueueInterface> value = next.getValue();
                Iterator<Map.Entry<Integer, ConsumeQueueInterface>> it2 = value.entrySet().iterator();
                while (it2.hasNext()) {
                    Map.Entry<Integer, ConsumeQueueInterface> next2 = it2.next();
                    long lastOffset = next2.getValue().getLastOffset();
                    if (lastOffset == -1) {
                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", new Object[]{next2.getValue().getTopic(), Integer.valueOf(next2.getValue().getQueueId()), Long.valueOf(next2.getValue().getMaxPhysicOffset()), Long.valueOf(next2.getValue().getMinLogicOffset())});
                    } else if (lastOffset < j) {
                        log.info("cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", new Object[]{key, next2.getKey(), Long.valueOf(j), Long.valueOf(lastOffset)});
                        removeTopicQueueTable(next2.getValue().getTopic(), Integer.valueOf(next2.getValue().getQueueId()));
                        destroy(next2.getValue());
                        it2.remove();
                    }
                }
                if (value.isEmpty()) {
                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", key);
                    it.remove();
                }
            }
        }
    }

    public void truncateDirty(long j) {
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueueInterface> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                truncateDirtyLogicFiles(it2.next(), j);
            }
        }
    }

    public long getTotalSize() {
        long j = 0;
        Iterator<ConcurrentMap<Integer, ConsumeQueueInterface>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueueInterface> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                j += it2.next().getTotalSize();
            }
        }
        return j;
    }
}
