package org.apache.rocketmq.tieredstore.index;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.index.IndexFile;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/index/IndexStoreService.class */
public class IndexStoreService extends ServiceThread implements IndexService {
    private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
    public static final String FILE_DIRECTORY_NAME = "tiered_index_file";
    public static final String FILE_COMPACTED_DIRECTORY_NAME = "compacting";
    private final MessageStoreConfig storeConfig;
    private final ConcurrentSkipListMap<Long, IndexFile> timeStoreTable;
    private final ReadWriteLock readWriteLock;
    private final AtomicLong compactTimestamp;
    private final String filePath;
    private final FlatFileFactory fileAllocator;
    private final boolean autoCreateNewFile;
    private volatile IndexFile currentWriteFile;
    private volatile FlatAppendFile flatAppendFile;

    public IndexStoreService(FlatFileFactory flatFileFactory, String str) {
        this(flatFileFactory, str, true);
    }

    public IndexStoreService(FlatFileFactory flatFileFactory, String str, boolean z) {
        this.storeConfig = flatFileFactory.getStoreConfig();
        this.filePath = str;
        this.fileAllocator = flatFileFactory;
        this.timeStoreTable = new ConcurrentSkipListMap<>();
        this.compactTimestamp = new AtomicLong(0L);
        this.readWriteLock = new ReentrantReadWriteLock();
        this.autoCreateNewFile = z;
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public void start() {
        recover();
        super.start();
    }

    private void doConvertOldFormatFile(String str) {
        try {
            File file = new File(str);
            if (file.exists()) {
                DefaultMappedFile defaultMappedFile = new DefaultMappedFile(file.getPath(), (int) file.length());
                long j = defaultMappedFile.getMappedByteBuffer().getLong(4);
                if (j <= 0) {
                    defaultMappedFile.destroy(TimeUnit.SECONDS.toMillis(10L));
                } else {
                    defaultMappedFile.renameTo(String.valueOf(new File(file.getParent(), String.valueOf(j))));
                    defaultMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10L));
                }
            }
        } catch (Exception e) {
            log.error("IndexStoreService do convert old format error, file: {}", str, e);
        }
    }

    private void recover() {
        Stopwatch createStarted = Stopwatch.createStarted();
        UtilAll.deleteFile(new File(Paths.get(this.storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
        File file = new File(Paths.get(this.storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
        doConvertOldFormatFile(Paths.get(file.getPath(), "0000").toString());
        doConvertOldFormatFile(Paths.get(file.getPath(), "1111").toString());
        File[] listFiles = file.listFiles();
        if (listFiles != null) {
            List<File> asList = Arrays.asList(listFiles);
            asList.sort(Comparator.comparing((v0) -> {
                return v0.getName();
            }));
            for (File file2 : asList) {
                if (!file2.isDirectory() && StringUtils.isNumeric(file2.getName())) {
                    try {
                        IndexStoreFile indexStoreFile = new IndexStoreFile(this.storeConfig, Long.parseLong(file2.getName()));
                        this.timeStoreTable.put(Long.valueOf(indexStoreFile.getTimestamp()), indexStoreFile);
                        log.info("IndexStoreService recover load local file, timestamp: {}", Long.valueOf(indexStoreFile.getTimestamp()));
                    } catch (Exception e) {
                        log.error("IndexStoreService recover, load local file error", e);
                    }
                }
            }
        }
        if (this.autoCreateNewFile && this.timeStoreTable.isEmpty()) {
            createNewIndexFile(System.currentTimeMillis());
        }
        if (!this.timeStoreTable.isEmpty()) {
            this.currentWriteFile = this.timeStoreTable.lastEntry().getValue();
            setCompactTimestamp(this.timeStoreTable.firstKey().longValue() - 1);
        }
        this.flatAppendFile = this.fileAllocator.createFlatFileForIndexFile(this.filePath);
        Iterator<FileSegment> it = this.flatAppendFile.getFileSegmentList().iterator();
        while (it.hasNext()) {
            IndexStoreFile indexStoreFile2 = new IndexStoreFile(this.storeConfig, it.next());
            IndexFile indexFile = this.timeStoreTable.get(Long.valueOf(indexStoreFile2.getTimestamp()));
            if (indexFile != null) {
                indexFile.destroy();
            }
            this.timeStoreTable.put(Long.valueOf(indexStoreFile2.getTimestamp()), indexStoreFile2);
            log.info("IndexStoreService recover load remote file, timestamp: {}, end timestamp: {}", Long.valueOf(indexStoreFile2.getTimestamp()), Long.valueOf(indexStoreFile2.getEndTimestamp()));
        }
        log.info("IndexStoreService recover finished, total: {}, cost: {}ms, directory: {}", new Object[]{Integer.valueOf(this.timeStoreTable.size()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), file.getAbsolutePath()});
    }

    public void createNewIndexFile(long j) {
        try {
            try {
                this.readWriteLock.writeLock().lock();
                IndexFile indexFile = this.currentWriteFile;
                if (this.timeStoreTable.containsKey(Long.valueOf(j)) || (indexFile != null && IndexFile.IndexStatusEnum.UNSEALED.equals(indexFile.getFileStatus()))) {
                    this.readWriteLock.writeLock().unlock();
                    return;
                }
                IndexStoreFile indexStoreFile = new IndexStoreFile(this.storeConfig, j);
                this.timeStoreTable.put(Long.valueOf(j), indexStoreFile);
                this.currentWriteFile = indexStoreFile;
                log.info("IndexStoreService construct next file, timestamp: {}", Long.valueOf(j));
                this.readWriteLock.writeLock().unlock();
            } catch (Exception e) {
                log.error("IndexStoreService construct next file, timestamp: {}", Long.valueOf(j), e);
                this.readWriteLock.writeLock().unlock();
            }
        } catch (Throwable th) {
            this.readWriteLock.writeLock().unlock();
            throw th;
        }
    }

    @VisibleForTesting
    public ConcurrentSkipListMap<Long, IndexFile> getTimeStoreTable() {
        return this.timeStoreTable;
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public AppendResult putKey(String str, int i, int i2, Set<String> set, long j, int i3, long j2) {
        if (StringUtils.isBlank(str)) {
            return AppendResult.UNKNOWN_ERROR;
        }
        if (set == null || set.isEmpty()) {
            return AppendResult.SUCCESS;
        }
        for (int i4 = 0; i4 < 3; i4++) {
            AppendResult putKey = this.currentWriteFile.putKey(str, i, i2, set, j, i3, j2);
            if (AppendResult.SUCCESS.equals(putKey)) {
                return AppendResult.SUCCESS;
            }
            if (AppendResult.FILE_FULL.equals(putKey)) {
                createNewIndexFile(System.currentTimeMillis());
            }
        }
        log.error("IndexStoreService put key three times return error, topic: {}, topicId: {}, queueId: {}, keySize: {}, timestamp: {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(set.size()), Long.valueOf(j2)});
        return AppendResult.SUCCESS;
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public CompletableFuture<List<IndexItem>> queryAsync(String str, String str2, int i, long j, long j2) {
        CompletableFuture<List<IndexItem>> completableFuture = new CompletableFuture<>();
        try {
            try {
                this.readWriteLock.readLock().lock();
                ConcurrentNavigableMap<Long, IndexFile> subMap = this.timeStoreTable.subMap((boolean) Long.valueOf(j), true, (boolean) Long.valueOf(j2), true);
                ArrayList arrayList = new ArrayList(subMap.size());
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                Iterator it = subMap.descendingMap().entrySet().iterator();
                while (it.hasNext()) {
                    arrayList.add(((IndexFile) ((Map.Entry) it.next()).getValue()).queryAsync(str, str2, i, j, j2).thenAccept(list -> {
                        list.forEach(indexItem -> {
                            if (concurrentHashMap.size() < i) {
                                concurrentHashMap.put(String.format("%d-%d", Integer.valueOf(indexItem.getQueueId()), Long.valueOf(indexItem.getOffset())), indexItem);
                            }
                        });
                    }));
                }
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r9, th) -> {
                    if (concurrentHashMap.isEmpty() && th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        ArrayList arrayList2 = new ArrayList(concurrentHashMap.values());
                        completableFuture.complete(arrayList2.subList(0, Math.min(arrayList2.size(), i)));
                    }
                });
                this.readWriteLock.readLock().unlock();
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                this.readWriteLock.readLock().unlock();
            }
            return completableFuture;
        } catch (Throwable th2) {
            this.readWriteLock.readLock().unlock();
            throw th2;
        }
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public void forceUpload() {
        try {
            try {
                this.readWriteLock.writeLock().lock();
                while (true) {
                    Map.Entry<Long, IndexFile> higherEntry = this.timeStoreTable.higherEntry(Long.valueOf(this.compactTimestamp.get()));
                    if (higherEntry == null) {
                        return;
                    }
                    if (doCompactThenUploadFile(higherEntry.getValue())) {
                        setCompactTimestamp(higherEntry.getValue().getTimestamp());
                        TimeUnit.MILLISECONDS.sleep(50L);
                    }
                }
            } catch (Exception e) {
                log.error("IndexStoreService force upload error", e);
                throw new RuntimeException(e);
            }
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    public boolean doCompactThenUploadFile(IndexFile indexFile) {
        if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
            log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}", Long.valueOf(indexFile.getTimestamp()), indexFile.getFileStatus());
            indexFile.destroy();
            return true;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        if (this.flatAppendFile.getCommitOffset() == this.flatAppendFile.getAppendOffset()) {
            ByteBuffer doCompaction = indexFile.doCompaction();
            if (doCompaction == null) {
                log.error("IndexStoreService found compaction buffer is null, timestamp: {}", Long.valueOf(indexFile.getTimestamp()));
                return false;
            }
            this.flatAppendFile.rollingNewFile(Math.max(0L, this.flatAppendFile.getAppendOffset()));
            this.flatAppendFile.append(doCompaction, indexFile.getTimestamp());
            this.flatAppendFile.getFileToWrite().setMinTimestamp(indexFile.getTimestamp());
            this.flatAppendFile.getFileToWrite().setMaxTimestamp(indexFile.getEndTimestamp());
        }
        boolean booleanValue = this.flatAppendFile.commitAsync().join().booleanValue();
        List<FileSegment> fileSegmentList = this.flatAppendFile.getFileSegmentList();
        FileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 1);
        if (!booleanValue || fileSegment == null || fileSegment.getMinTimestamp() != indexFile.getTimestamp()) {
            log.warn("IndexStoreService upload compacted file error, timestamp: {}", Long.valueOf(indexFile.getTimestamp()));
            return false;
        }
        log.info("IndexStoreService upload compacted file success, timestamp: {}", Long.valueOf(indexFile.getTimestamp()));
        this.readWriteLock.writeLock().lock();
        try {
            try {
                IndexStoreFile indexStoreFile = new IndexStoreFile(this.storeConfig, fileSegment);
                this.timeStoreTable.put(Long.valueOf(indexStoreFile.getTimestamp()), indexStoreFile);
                indexFile.destroy();
                this.readWriteLock.writeLock().unlock();
                return true;
            } catch (Exception e) {
                log.error("IndexStoreService rolling file error, timestamp: {}, cost: {}ms", new Object[]{Long.valueOf(indexFile.getTimestamp()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), e});
                this.readWriteLock.writeLock().unlock();
                return true;
            }
        } catch (Throwable th) {
            this.readWriteLock.writeLock().unlock();
            throw th;
        }
    }

    public void destroyExpiredFile(long j) {
        this.readWriteLock.writeLock().lock();
        try {
            this.timeStoreTable.entrySet().removeIf(entry -> {
                return ((Long) entry.getKey()).longValue() < j && IndexFile.IndexStatusEnum.UPLOAD.equals(((IndexFile) entry.getValue()).getFileStatus());
            });
            this.flatAppendFile.destroyExpiredFile(j);
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public void destroy() {
        this.readWriteLock.writeLock().lock();
        try {
            try {
                Iterator<Map.Entry<Long, IndexFile>> it = this.timeStoreTable.entrySet().iterator();
                while (it.hasNext()) {
                    IndexFile value = it.next().getValue();
                    if (!IndexFile.IndexStatusEnum.UPLOAD.equals(value.getFileStatus())) {
                        value.destroy();
                    }
                }
                if (this.flatAppendFile != null) {
                    this.flatAppendFile.destroy();
                }
            } catch (Exception e) {
                log.error("IndexStoreService destroy all file error", e);
                this.readWriteLock.writeLock().unlock();
            }
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    public String getServiceName() {
        return IndexStoreService.class.getSimpleName();
    }

    public void setCompactTimestamp(long j) {
        this.compactTimestamp.set(j);
        log.debug("IndexStoreService set compact timestamp to: {}", Long.valueOf(j));
    }

    protected IndexFile getNextSealedFile() {
        Map.Entry<Long, IndexFile> higherEntry = this.timeStoreTable.higherEntry(Long.valueOf(this.compactTimestamp.get()));
        if (higherEntry == null || higherEntry.getKey().longValue() >= this.timeStoreTable.lastKey().longValue()) {
            return null;
        }
        return higherEntry.getValue();
    }

    @Override // org.apache.rocketmq.tieredstore.index.IndexService
    public void shutdown() {
        super.shutdown();
        while (!this.timeStoreTable.isEmpty()) {
            try {
                TimeUnit.MILLISECONDS.sleep(50L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void run() {
        while (!isStopped()) {
            destroyExpiredFile(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.storeConfig.getTieredStoreFileReservedTime()));
            IndexFile nextSealedFile = getNextSealedFile();
            if (nextSealedFile == null || !doCompactThenUploadFile(nextSealedFile)) {
                waitForRunning(TimeUnit.SECONDS.toMillis(10L));
            } else {
                setCompactTimestamp(nextSealedFile.getTimestamp());
            }
        }
        this.readWriteLock.writeLock().lock();
        try {
            try {
                if (this.autoCreateNewFile) {
                    forceUpload();
                }
                this.timeStoreTable.forEach((l, indexFile) -> {
                    indexFile.shutdown();
                });
                this.timeStoreTable.clear();
                this.readWriteLock.writeLock().unlock();
            } catch (Exception e) {
                log.error("IndexStoreService shutdown error", e);
                this.readWriteLock.writeLock().unlock();
            }
            log.info(getServiceName() + " service shutdown");
        } catch (Throwable th) {
            this.readWriteLock.writeLock().unlock();
            throw th;
        }
    }
}
