package org.apache.rocketmq.tieredstore.file;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/file/FlatFileStore.class */
public class FlatFileStore {
    private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
    private final MetadataStore metadataStore;
    private final MessageStoreConfig storeConfig;
    private final MessageStoreExecutor executor;
    private final FlatFileFactory flatFileFactory;
    private final ConcurrentMap<MessageQueue, FlatMessageFile> flatFileConcurrentMap = new ConcurrentHashMap();

    public FlatFileStore(MessageStoreConfig messageStoreConfig, MetadataStore metadataStore, MessageStoreExecutor messageStoreExecutor) {
        this.storeConfig = messageStoreConfig;
        this.metadataStore = metadataStore;
        this.executor = messageStoreExecutor;
        this.flatFileFactory = new FlatFileFactory(metadataStore, messageStoreConfig, messageStoreExecutor);
    }

    public boolean load() {
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            this.flatFileConcurrentMap.clear();
            recover();
            log.info("FlatFileStore recover finished, total cost={}ms", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            return true;
        } catch (Exception e) {
            long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
            log.info("FlatFileStore recover error, total cost={}ms", Long.valueOf(elapsed));
            LoggerFactory.getLogger("RocketmqBroker").error("FlatFileStore recover error, total cost={}ms", Long.valueOf(elapsed), e);
            return false;
        }
    }

    public void recover() {
        Semaphore semaphore = new Semaphore(this.storeConfig.getTieredStoreMaxPendingLimit() / 4);
        ArrayList arrayList = new ArrayList();
        this.metadataStore.iterateTopic(topicMetadata -> {
            semaphore.acquireUninterruptibly();
            arrayList.add(recoverAsync(topicMetadata).whenComplete((r7, th) -> {
                if (th != null) {
                    log.error("FlatFileStore recover file error, topic={}", topicMetadata.getTopic(), th);
                }
                semaphore.release();
            }));
        });
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).join();
    }

    public CompletableFuture<Void> recoverAsync(TopicMetadata topicMetadata) {
        return CompletableFuture.runAsync(() -> {
            Stopwatch createStarted = Stopwatch.createStarted();
            AtomicLong atomicLong = new AtomicLong();
            this.metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> {
                FlatMessageFile computeIfAbsent = computeIfAbsent(new MessageQueue(topicMetadata.getTopic(), this.storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
                atomicLong.incrementAndGet();
                log.debug("FlatFileStore recover file, topicId={}, topic={}, queueId={}, cost={}ms", new Object[]{Long.valueOf(computeIfAbsent.getTopicId()), computeIfAbsent.getMessageQueue().getTopic(), Integer.valueOf(computeIfAbsent.getMessageQueue().getQueueId()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
            });
            log.info("FlatFileStore recover file, topic={}, total={}, cost={}ms", new Object[]{topicMetadata.getTopic(), Long.valueOf(atomicLong.get()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
        }, this.executor.bufferCommitExecutor);
    }

    public void scheduleDeleteExpireFile() {
        if (this.storeConfig.isTieredStoreDeleteFileEnable()) {
            Stopwatch createStarted = Stopwatch.createStarted();
            ImmutableList<FlatMessageFile> deepCopyFlatFileToList = deepCopyFlatFileToList();
            UnmodifiableIterator it = deepCopyFlatFileToList.iterator();
            while (it.hasNext()) {
                FlatMessageFile flatMessageFile = (FlatMessageFile) it.next();
                flatMessageFile.getFileLock().lock();
                try {
                    try {
                        flatMessageFile.destroyExpiredFile(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(flatMessageFile.getFileReservedHours()));
                        flatMessageFile.getFileLock().unlock();
                    } catch (Exception e) {
                        log.error("FlatFileStore delete expire file error", e);
                        flatMessageFile.getFileLock().unlock();
                    }
                } catch (Throwable th) {
                    flatMessageFile.getFileLock().unlock();
                    throw th;
                }
            }
            log.info("FlatFileStore schedule delete expired file, count={}, cost={}ms", Integer.valueOf(deepCopyFlatFileToList.size()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
        }
    }

    public MetadataStore getMetadataStore() {
        return this.metadataStore;
    }

    public MessageStoreConfig getStoreConfig() {
        return this.storeConfig;
    }

    public FlatFileFactory getFlatFileFactory() {
        return this.flatFileFactory;
    }

    public FlatMessageFile computeIfAbsent(MessageQueue messageQueue) {
        return this.flatFileConcurrentMap.computeIfAbsent(messageQueue, messageQueue2 -> {
            return new FlatMessageFile(this.flatFileFactory, messageQueue2.getTopic(), messageQueue2.getQueueId());
        });
    }

    public FlatMessageFile getFlatFile(MessageQueue messageQueue) {
        return this.flatFileConcurrentMap.get(messageQueue);
    }

    public ImmutableList<FlatMessageFile> deepCopyFlatFileToList() {
        return ImmutableList.copyOf(this.flatFileConcurrentMap.values());
    }

    public void shutdown() {
        this.flatFileConcurrentMap.values().forEach((v0) -> {
            v0.shutdown();
        });
    }

    public void destroyFile(MessageQueue messageQueue) {
        if (messageQueue == null) {
            return;
        }
        FlatMessageFile remove = this.flatFileConcurrentMap.remove(messageQueue);
        if (remove != null) {
            remove.shutdown();
            remove.destroy();
        }
        log.info("FlatFileStore destroy file, topic={}, queueId={}", messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()));
    }

    public void destroy() {
        shutdown();
        this.flatFileConcurrentMap.values().forEach((v0) -> {
            v0.destroy();
        });
        this.flatFileConcurrentMap.clear();
    }
}
