package org.apache.tubemq.server.broker.msgstore;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.TubeBroker;
import org.apache.tubemq.server.broker.exception.StartupException;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/server/broker/msgstore/MessageStoreManager.class */
public class MessageStoreManager implements StoreService {
    private static final Logger logger = LoggerFactory.getLogger(MessageStoreManager.class);
    private final BrokerConfig tubeConfig;
    private final TubeBroker tubeBroker;
    private final MetadataManager metadataManager;
    private final ScheduledExecutorService logClearScheduler;
    private final ScheduledExecutorService unFlushDiskScheduler;
    private final ScheduledExecutorService unFlushMemScheduler;
    private int maxMsgTransferSize;
    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>> dataStores = new ConcurrentHashMap<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private AtomicBoolean isRemovingTopic = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tubemq/server/broker/msgstore/MessageStoreManager$DiskUnFlushRunner.class */
    public class DiskUnFlushRunner implements Runnable {
        public DiskUnFlushRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            StringBuilder sb = new StringBuilder(256);
            for (Map map : MessageStoreManager.this.dataStores.values()) {
                if (map != null && !map.isEmpty()) {
                    for (MessageStore messageStore : map.values()) {
                        if (messageStore != null) {
                            try {
                                messageStore.flushFile();
                            } catch (Throwable th) {
                                MessageStoreManager.logger.error(sb.append("[Store Manager] Try to flush ").append(messageStore.getStoreKey()).append("'s file-store failed : ").toString(), th);
                                sb.delete(0, sb.length());
                            }
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tubemq/server/broker/msgstore/MessageStoreManager$LogClearRunner.class */
    public class LogClearRunner implements Runnable {
        public LogClearRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Map map;
            StringBuilder sb = new StringBuilder(256);
            long currentTimeMillis = System.currentTimeMillis();
            Set<String> expiredTopicSet = getExpiredTopicSet(sb);
            if (!expiredTopicSet.isEmpty()) {
                MessageStoreManager.logger.info(sb.append("Found ").append(expiredTopicSet.size()).append(" files expired, start delete files!").toString());
                sb.delete(0, sb.length());
                for (String str : expiredTopicSet) {
                    if (str != null && (map = (Map) MessageStoreManager.this.dataStores.get(str)) != null && !map.isEmpty()) {
                        for (Map.Entry entry : map.entrySet()) {
                            if (entry.getValue() != null) {
                                try {
                                    ((MessageStore) entry.getValue()).runClearupPolicy(false);
                                } catch (Throwable th) {
                                    MessageStoreManager.logger.error(sb.append("Try to run delete policy with ").append(((MessageStore) entry.getValue()).getStoreKey()).append("'s log file  failed").toString(), th);
                                    sb.delete(0, sb.length());
                                }
                            }
                        }
                    }
                }
                MessageStoreManager.logger.info("Log Clear Scheduler finished file delete!");
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 >= MessageStoreManager.this.tubeConfig.getLogClearupDurationMs()) {
                MessageStoreManager.logger.warn(sb.append("Log Clear up task continue over the clearup duration, ").append("used ").append(currentTimeMillis2).append(", configure value is ").append(MessageStoreManager.this.tubeConfig.getLogClearupDurationMs()).toString());
                sb.delete(0, sb.length());
            }
        }

        private Set<String> getExpiredTopicSet(StringBuilder sb) {
            HashSet hashSet = new HashSet();
            for (Map map : MessageStoreManager.this.dataStores.values()) {
                if (map != null && !map.isEmpty()) {
                    for (MessageStore messageStore : map.values()) {
                        if (messageStore != null) {
                            try {
                                if (messageStore.runClearupPolicy(true)) {
                                    hashSet.add(messageStore.getTopic());
                                }
                            } catch (Throwable th) {
                                MessageStoreManager.logger.error(sb.append("Try to run delete policy with ").append(messageStore.getStoreKey()).append("'s log file failed").toString(), th);
                                sb.delete(0, sb.length());
                            }
                        }
                    }
                }
            }
            return hashSet;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tubemq/server/broker/msgstore/MessageStoreManager$MemUnFlushRunner.class */
    public class MemUnFlushRunner implements Runnable {
        public MemUnFlushRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            StringBuilder sb = new StringBuilder(256);
            for (Map map : MessageStoreManager.this.dataStores.values()) {
                if (map != null && !map.isEmpty()) {
                    for (MessageStore messageStore : map.values()) {
                        if (messageStore != null) {
                            try {
                                messageStore.flushMemCacheData();
                            } catch (Throwable th) {
                                MessageStoreManager.logger.error(sb.append("[Store Manager] Try to flush ").append(messageStore.getStoreKey()).append("'s mem-store failed : ").toString(), th);
                                sb.delete(0, sb.length());
                            }
                        }
                    }
                }
            }
        }
    }

    public MessageStoreManager(TubeBroker tubeBroker, BrokerConfig brokerConfig) throws IOException {
        this.tubeConfig = brokerConfig;
        this.tubeBroker = tubeBroker;
        this.metadataManager = this.tubeBroker.getMetadataManager();
        this.isRemovingTopic.set(false);
        this.maxMsgTransferSize = Math.min(brokerConfig.getTransferSize(), DataStoreUtils.MAX_MSG_TRANSFER_SIZE);
        this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() { // from class: org.apache.tubemq.server.broker.msgstore.MessageStoreManager.1
            @Override // java.beans.PropertyChangeListener
            public void propertyChange(PropertyChangeEvent propertyChangeEvent) {
                MessageStoreManager.this.refreshMessageStoresHoldVals((Map) propertyChangeEvent.getOldValue(), (Map) propertyChangeEvent.getNewValue());
            }
        });
        this.logClearScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: org.apache.tubemq.server.broker.msgstore.MessageStoreManager.2
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "Broker Log Clear Thread");
            }
        });
        this.unFlushDiskScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: org.apache.tubemq.server.broker.msgstore.MessageStoreManager.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "Broker Log Disk Flush Thread");
            }
        });
        this.unFlushMemScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: org.apache.tubemq.server.broker.msgstore.MessageStoreManager.4
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "Broker Log Mem Flush Thread");
            }
        });
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public void start() {
        try {
            loadMessageStores(this.tubeConfig);
        } catch (IOException e) {
            logger.error("[Store Manager] load message stores failed", e);
            throw new StartupException("Initialize message store manager failed", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        this.logClearScheduler.scheduleWithFixedDelay(new LogClearRunner(), this.tubeConfig.getLogClearupDurationMs(), this.tubeConfig.getLogClearupDurationMs(), TimeUnit.MILLISECONDS);
        this.unFlushDiskScheduler.scheduleWithFixedDelay(new DiskUnFlushRunner(), this.tubeConfig.getLogFlushDiskDurMs(), this.tubeConfig.getLogFlushDiskDurMs(), TimeUnit.MILLISECONDS);
        this.unFlushMemScheduler.scheduleWithFixedDelay(new MemUnFlushRunner(), this.tubeConfig.getLogFlushMemDurMs(), this.tubeConfig.getLogFlushMemDurMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public void close() {
        if (!this.stopped.get() && this.stopped.compareAndSet(false, true)) {
            logger.info("[Store Manager] begin close store manager......");
            this.logClearScheduler.shutdownNow();
            this.unFlushDiskScheduler.shutdownNow();
            this.unFlushMemScheduler.shutdownNow();
            for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : this.dataStores.entrySet()) {
                if (entry.getValue() != null) {
                    for (Map.Entry<Integer, MessageStore> entry2 : entry.getValue().entrySet()) {
                        if (entry2.getValue() != null) {
                            try {
                                entry2.getValue().close();
                            } catch (Throwable th) {
                                logger.error(new StringBuilder(512).append("[Store Manager] Try to run close  ").append(entry2.getValue().getStoreKey()).append(" failed").toString(), th);
                            }
                        }
                    }
                }
            }
            this.dataStores.clear();
            logger.info("[Store Manager] Store Manager stopped!");
        }
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public List<String> removeTopicStore() {
        if (this.isRemovingTopic.get() || !this.isRemovingTopic.compareAndSet(false, true)) {
            return null;
        }
        try {
            ArrayList arrayList = new ArrayList();
            Map<String, TopicMetadata> removedTopicConfigMap = this.metadataManager.getRemovedTopicConfigMap();
            if (removedTopicConfigMap.isEmpty()) {
                return arrayList;
            }
            HashSet<String> hashSet = new HashSet();
            for (Map.Entry<String, TopicMetadata> entry : removedTopicConfigMap.entrySet()) {
                if (entry.getKey() != null && entry.getValue() != null) {
                    if (entry.getValue().getStatusId() == 2) {
                        hashSet.add(entry.getKey());
                    }
                }
            }
            if (hashSet.isEmpty()) {
                this.isRemovingTopic.set(false);
                return arrayList;
            }
            for (String str : hashSet) {
                ConcurrentHashMap<Integer, MessageStore> concurrentHashMap = this.dataStores.get(str);
                if (concurrentHashMap != null) {
                    for (Integer num : concurrentHashMap.keySet()) {
                        try {
                            concurrentHashMap.remove(num).close();
                            if (concurrentHashMap.isEmpty()) {
                                this.dataStores.remove(str);
                            }
                        } catch (Throwable th) {
                            logger.error(new StringBuilder(512).append("[Remove Topic] Close removed store failure, storeKey=").append(str).append("-").append(num).toString(), th);
                        }
                    }
                }
                TopicMetadata topicMetadata = removedTopicConfigMap.get(str);
                if (topicMetadata != null) {
                    StringBuilder sb = new StringBuilder(512);
                    for (int i = 0; i < topicMetadata.getNumTopicStores(); i++) {
                        String sb2 = sb.append(topicMetadata.getDataPath()).append(File.separator).append(str).append("-").append(i).toString();
                        sb.delete(0, sb.length());
                        try {
                            delTopicFiles(sb2);
                        } catch (Throwable th2) {
                            logger.error("[Remove Topic] Remove topic data error : ", th2);
                        }
                        ThreadUtils.sleep(50L);
                    }
                    topicMetadata.setStatusId(3);
                    arrayList.add(str);
                }
                ThreadUtils.sleep(100L);
            }
            this.isRemovingTopic.set(false);
            return arrayList;
        } finally {
            this.isRemovingTopic.set(false);
        }
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public Collection<MessageStore> getMessageStoresByTopic(String str) {
        ConcurrentHashMap<Integer, MessageStore> concurrentHashMap = this.dataStores.get(str);
        return concurrentHashMap == null ? Collections.emptyList() : concurrentHashMap.values();
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public MessageStore getOrCreateMessageStore(String str, int i) throws Throwable {
        StringBuilder sb = new StringBuilder(512);
        int i2 = i < 10000 ? 0 : i / 10000;
        int i3 = i < 10000 ? i : i % 10000;
        String sb2 = sb.append("tube_store_manager_").append(str).toString();
        sb.delete(0, sb.length());
        if (i3 < 0 || i3 >= this.metadataManager.getNumPartitions(str)) {
            throw new IllegalArgumentException(sb.append("Wrong partition value ").append(i).append(",valid partitions in (0,").append(this.metadataManager.getNumPartitions(str) - 1).append(")").toString());
        }
        ConcurrentHashMap<Integer, MessageStore> concurrentHashMap = this.dataStores.get(str);
        if (concurrentHashMap == null) {
            ConcurrentHashMap<Integer, MessageStore> concurrentHashMap2 = new ConcurrentHashMap<>();
            concurrentHashMap = this.dataStores.putIfAbsent(str, concurrentHashMap2);
            if (concurrentHashMap == null) {
                concurrentHashMap = concurrentHashMap2;
            }
        }
        MessageStore messageStore = concurrentHashMap.get(Integer.valueOf(i2));
        if (messageStore == null) {
            synchronized (sb2.intern()) {
                messageStore = concurrentHashMap.get(Integer.valueOf(i2));
                if (messageStore == null) {
                    MessageStore messageStore2 = new MessageStore(this, this.metadataManager.getTopicMetadata(str), i2, this.tubeConfig, 0L, this.maxMsgTransferSize);
                    messageStore = concurrentHashMap.putIfAbsent(Integer.valueOf(i2), messageStore2);
                    if (messageStore == null) {
                        messageStore = messageStore2;
                        logger.info(sb.append("[Store Manager] Created a new message storage, storeKey=").append(str).append("-").append(i2).toString());
                    } else {
                        messageStore2.close();
                    }
                }
            }
        }
        return messageStore;
    }

    public TubeBroker getTubeBroker() {
        return this.tubeBroker;
    }

    public GetMessageResult getMessages(MessageStore messageStore, String str, int i, int i2, Set<String> set) throws IOException {
        long j = 0;
        try {
            long indexMaxOffset = messageStore.getIndexMaxOffset();
            ConsumerNodeInfo consumerNodeInfo = new ConsumerNodeInfo(this.tubeBroker.getStoreManager(), "visit", set, "", System.currentTimeMillis(), "");
            int partitionNum = (i2 + 1) * 28 * messageStore.getPartitionNum();
            if (set != null && !set.isEmpty()) {
                partitionNum *= 5;
            }
            j = indexMaxOffset - ((long) partitionNum) < 0 ? 0L : indexMaxOffset - partitionNum;
            return messageStore.getMessages(303, j, i, consumerNodeInfo, str, this.maxMsgTransferSize);
        } catch (Throwable th) {
            return new GetMessageResult(false, 500, j, 0, "Get message failure, errMsg=" + th.getMessage());
        }
    }

    public MetadataManager getMetadataManager() {
        return this.tubeBroker.getMetadataManager();
    }

    public int getMaxMsgTransferSize() {
        return this.maxMsgTransferSize;
    }

    public Map<String, ConcurrentHashMap<Integer, MessageStore>> getMessageStores() {
        return Collections.unmodifiableMap(this.dataStores);
    }

    @Override // org.apache.tubemq.server.broker.msgstore.StoreService
    public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(Set<String> set) {
        ConcurrentHashMap<Integer, MessageStore> concurrentHashMap;
        HashSet<String> hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        Map<String, TopicMetadata> topicConfigMap = this.metadataManager.getTopicConfigMap();
        if (set == null || set.isEmpty()) {
            hashSet.addAll(topicConfigMap.keySet());
        } else {
            for (String str : set) {
                if (topicConfigMap.containsKey(str)) {
                    hashSet.add(str);
                }
            }
        }
        if (hashSet.isEmpty()) {
            return hashMap;
        }
        for (String str2 : hashSet) {
            TopicMetadata topicMetadata = topicConfigMap.get(str2);
            if (topicMetadata != null && (concurrentHashMap = this.dataStores.get(str2)) != null) {
                HashMap hashMap2 = new HashMap();
                for (Map.Entry<Integer, MessageStore> entry : concurrentHashMap.entrySet()) {
                    if (entry != null && entry.getKey() != null && entry.getValue() != null) {
                        MessageStore value = entry.getValue();
                        for (Integer num : topicMetadata.getPartIdsByStoreId(entry.getKey().intValue())) {
                            hashMap2.put(num, new TopicPubStoreInfo(str2, entry.getKey().intValue(), num.intValue(), value.getIndexMinOffset(), value.getIndexMaxOffset(), value.getDataMinOffset(), value.getDataMaxOffset()));
                        }
                    }
                }
                hashMap.put(str2, hashMap2);
            }
        }
        return hashMap;
    }

    private Set<File> getLogDirSet(BrokerConfig brokerConfig) throws IOException {
        HashSet hashSet = new HashSet();
        hashSet.add(brokerConfig.getPrimaryPath());
        Iterator<String> it = this.metadataManager.getTopics().iterator();
        while (it.hasNext()) {
            TopicMetadata topicMetadata = this.metadataManager.getTopicMetadata(it.next());
            if (topicMetadata != null && TStringUtils.isNotBlank(topicMetadata.getDataPath())) {
                hashSet.add(topicMetadata.getDataPath());
            }
        }
        HashSet hashSet2 = new HashSet();
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            File file = new File((String) it2.next());
            if (!file.exists() && !file.mkdirs()) {
                throw new IOException(new StringBuilder(512).append("Could not make Log directory ").append(file.getAbsolutePath()).toString());
            }
            if (!file.isDirectory() || !file.canRead()) {
                throw new IOException(new StringBuilder(512).append("Log path ").append(file.getAbsolutePath()).append(" is not a readable directory").toString());
            }
            hashSet2.add(file);
        }
        return hashSet2;
    }

    private void loadMessageStores(final BrokerConfig brokerConfig) throws IOException, InterruptedException {
        File[] listFiles;
        StringBuilder sb = new StringBuilder(512);
        logger.info(sb.append("[Store Manager] Begin to load message stores from path ").append(brokerConfig.getPrimaryPath()).toString());
        sb.delete(0, sb.length());
        long currentTimeMillis = System.currentTimeMillis();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList();
        for (File file : getLogDirSet(brokerConfig)) {
            if (file != null && (listFiles = file.listFiles()) != null) {
                for (final File file2 : listFiles) {
                    if (file2 != null && file2.isDirectory()) {
                        String name = file2.getName();
                        int lastIndexOf = name.lastIndexOf(45);
                        if (lastIndexOf < 0) {
                            logger.warn(sb.append("[Store Manager] Ignore invalid directory:").append(file2.getAbsolutePath()).toString());
                            sb.delete(0, sb.length());
                        } else {
                            String substring = name.substring(0, lastIndexOf);
                            final TopicMetadata topicMetadata = this.metadataManager.getTopicMetadata(substring);
                            if (topicMetadata == null) {
                                logger.warn(sb.append("[Store Manager] No valid topic config for topic data directories:").append(substring).toString());
                                sb.delete(0, sb.length());
                            } else {
                                final int parseInt = Integer.parseInt(name.substring(lastIndexOf + 1));
                                arrayList.add(new Callable<MessageStore>() { // from class: org.apache.tubemq.server.broker.msgstore.MessageStoreManager.5
                                    /* JADX WARN: Can't rename method to resolve collision */
                                    @Override // java.util.concurrent.Callable
                                    public MessageStore call() throws Exception {
                                        try {
                                            try {
                                                MessageStore messageStore = new MessageStore(this, topicMetadata, parseInt, brokerConfig, MessageStoreManager.this.maxMsgTransferSize);
                                                ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) MessageStoreManager.this.dataStores.get(messageStore.getTopic());
                                                if (concurrentHashMap == null) {
                                                    concurrentHashMap = new ConcurrentHashMap();
                                                    ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) MessageStoreManager.this.dataStores.putIfAbsent(messageStore.getTopic(), concurrentHashMap);
                                                    if (concurrentHashMap2 != null) {
                                                        concurrentHashMap = concurrentHashMap2;
                                                    }
                                                }
                                                if (((MessageStore) concurrentHashMap.putIfAbsent(Integer.valueOf(messageStore.getStoreId()), messageStore)) != null) {
                                                    try {
                                                        messageStore.close();
                                                        MessageStoreManager.logger.info(new StringBuilder(512).append("[Store Manager] Close duplicated messageStore ").append(messageStore.getStoreKey()).toString());
                                                    } catch (Throwable th) {
                                                        MessageStoreManager.logger.info("[Store Manager] Close duplicated messageStore failure", th);
                                                    }
                                                }
                                                atomicInteger2.incrementAndGet();
                                                return null;
                                            } catch (Throwable th2) {
                                                atomicInteger.incrementAndGet();
                                                MessageStoreManager.logger.error(new StringBuilder(512).append("[Store Manager] Loaded ").append(file2.getAbsolutePath()).append("message store failure:").toString(), th2);
                                                atomicInteger2.incrementAndGet();
                                                return null;
                                            }
                                        } catch (Throwable th3) {
                                            atomicInteger2.incrementAndGet();
                                            throw th3;
                                        }
                                    }
                                });
                            }
                        }
                    }
                }
            }
        }
        loadStoresInParallel(arrayList);
        arrayList.clear();
        if (atomicInteger.get() > 0) {
            throw new RuntimeException("[Store Manager] failure to load message stores, please check load logger and fix first!");
        }
        logger.info(sb.append("[Store Manager] End to load message stores in ").append((System.currentTimeMillis() - currentTimeMillis) / 1000).append(" secs").toString());
    }

    private void loadStoresInParallel(List<Callable<MessageStore>> list) throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        Iterator<Callable<MessageStore>> it = list.iterator();
        while (it.hasNext()) {
            executorCompletionService.submit(it.next());
        }
        for (int i = 0; i < list.size(); i++) {
            try {
                executorCompletionService.take().get();
            } catch (Throwable th) {
            }
        }
        newFixedThreadPool.shutdown();
    }

    private void delTopicFiles(String str) throws IOException {
        File[] listFiles;
        File file = new File(str);
        if (file.exists()) {
            if (file.isFile()) {
                file.delete();
            } else if (file.isDirectory() && (listFiles = file.listFiles()) != null) {
                for (File file2 : listFiles) {
                    delTopicFiles(file2.getAbsolutePath());
                }
            }
            file.delete();
        }
    }

    public void refreshMessageStoresHoldVals(Map<String, TopicMetadata> map, Map<String, TopicMetadata> map2) {
        ConcurrentHashMap<Integer, MessageStore> concurrentHashMap;
        if (map2 == null || map2.isEmpty() || map == null || map.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder(512);
        for (TopicMetadata topicMetadata : map2.values()) {
            TopicMetadata topicMetadata2 = map.get(topicMetadata.getTopic());
            if (topicMetadata2 != null && !topicMetadata2.isPropertyEquals(topicMetadata) && (concurrentHashMap = this.dataStores.get(topicMetadata.getTopic())) != null && !concurrentHashMap.isEmpty()) {
                for (Map.Entry<Integer, MessageStore> entry : concurrentHashMap.entrySet()) {
                    if (entry.getValue() != null) {
                        try {
                            entry.getValue().refreshUnflushThreshold(topicMetadata);
                        } catch (Throwable th) {
                            logger.error(sb.append("[Store Manager] refresh ").append(entry.getValue().getStoreKey()).append("'s parameter error,").toString(), th);
                            sb.delete(0, sb.length());
                        }
                    }
                }
            }
        }
    }
}
