package org.apache.rocketmq.tieredstore.file;

import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/file/FlatMessageFile.class */
public class FlatMessageFile implements FlatFileInterface {
    protected static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
    protected volatile boolean closed;
    protected TopicMetadata topicMetadata;
    protected QueueMetadata queueMetadata;
    protected final String filePath;
    protected final ReentrantLock fileLock;
    protected final Semaphore commitLock;
    protected final MessageStoreConfig storeConfig;
    protected final MetadataStore metadataStore;
    protected final FlatCommitLogFile commitLog;
    protected final FlatConsumeQueueFile consumeQueue;
    protected final AtomicLong lastDestroyTime;
    protected final ConcurrentMap<String, CompletableFuture<?>> inFlightRequestMap;

    public FlatMessageFile(FlatFileFactory flatFileFactory, String str, int i) {
        this(flatFileFactory, MessageStoreUtil.toFilePath(new MessageQueue(str, flatFileFactory.getStoreConfig().getBrokerName(), i)));
        this.topicMetadata = recoverTopicMetadata(str);
        this.queueMetadata = recoverQueueMetadata(str, i);
    }

    public FlatMessageFile(FlatFileFactory flatFileFactory, String str) {
        this.closed = false;
        this.commitLock = new Semaphore(1);
        this.filePath = str;
        this.fileLock = new ReentrantLock(false);
        this.storeConfig = flatFileFactory.getStoreConfig();
        this.metadataStore = flatFileFactory.getMetadataStore();
        this.commitLog = flatFileFactory.createFlatFileForCommitLog(str);
        this.consumeQueue = flatFileFactory.createFlatFileForConsumeQueue(str);
        this.lastDestroyTime = new AtomicLong();
        this.inFlightRequestMap = new ConcurrentHashMap();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getTopicId() {
        return this.topicMetadata.getTopicId();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public MessageQueue getMessageQueue() {
        if (this.queueMetadata != null) {
            return this.queueMetadata.getQueue();
        }
        return null;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public boolean isFlatFileInit() {
        return !this.consumeQueue.fileSegmentTable.isEmpty();
    }

    public TopicMetadata recoverTopicMetadata(String str) {
        TopicMetadata topic = this.metadataStore.getTopic(str);
        if (topic == null) {
            topic = this.metadataStore.addTopic(str, -1L);
        }
        return topic;
    }

    public QueueMetadata recoverQueueMetadata(String str, int i) {
        MessageQueue messageQueue = new MessageQueue(str, this.storeConfig.getBrokerName(), i);
        QueueMetadata queue = this.metadataStore.getQueue(messageQueue);
        if (queue == null) {
            queue = this.metadataStore.addQueue(messageQueue, -1L);
        }
        return queue;
    }

    public void flushMetadata() {
        if (this.queueMetadata != null) {
            this.queueMetadata.setMinOffset(getConsumeQueueMinOffset());
            this.queueMetadata.setMaxOffset(getConsumeQueueCommitOffset());
            this.queueMetadata.setUpdateTimestamp(System.currentTimeMillis());
            this.metadataStore.updateQueue(this.queueMetadata);
        }
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public Lock getFileLock() {
        return this.fileLock;
    }

    @VisibleForTesting
    public Semaphore getCommitLock() {
        return this.commitLock;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public boolean rollingFile(long j) {
        return this.commitLog.tryRollingFile(j);
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public void initOffset(long j) {
        this.fileLock.lock();
        try {
            this.commitLog.initOffset(0L);
            this.consumeQueue.initOffset(j * 20);
        } finally {
            this.fileLock.unlock();
        }
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public AppendResult appendCommitLog(ByteBuffer byteBuffer) {
        return this.closed ? AppendResult.FILE_CLOSED : this.commitLog.append(byteBuffer, MessageFormatUtil.getStoreTimeStamp(byteBuffer));
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public AppendResult appendCommitLog(SelectMappedBufferResult selectMappedBufferResult) {
        return this.closed ? AppendResult.FILE_CLOSED : appendCommitLog(selectMappedBufferResult.getByteBuffer());
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public AppendResult appendConsumeQueue(DispatchRequest dispatchRequest) {
        if (this.closed) {
            return AppendResult.FILE_CLOSED;
        }
        ByteBuffer allocate = ByteBuffer.allocate(20);
        allocate.putLong(dispatchRequest.getCommitLogOffset());
        allocate.putInt(dispatchRequest.getMsgSize());
        allocate.putLong(dispatchRequest.getTagsCode());
        allocate.flip();
        return this.consumeQueue.append(allocate, dispatchRequest.getStoreTimestamp());
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public void release() {
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getMinStoreTimestamp() {
        long j = -1;
        if (Long.MAX_VALUE != this.commitLog.getMinTimestamp()) {
            j = Math.max(-1L, this.commitLog.getMinTimestamp());
        }
        if (Long.MAX_VALUE != this.consumeQueue.getMinTimestamp()) {
            j = Math.max(j, this.consumeQueue.getMinTimestamp());
        }
        return j;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getMaxStoreTimestamp() {
        return this.commitLog.getMaxTimestamp();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getFirstMessageOffset() {
        return this.commitLog.getMinOffsetFromFile();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getCommitLogMinOffset() {
        return this.commitLog.getMinOffset();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getCommitLogMaxOffset() {
        return this.commitLog.getAppendOffset();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getCommitLogCommitOffset() {
        return this.commitLog.getCommitOffset();
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getConsumeQueueMinOffset() {
        return Math.max(this.consumeQueue.getMinOffset() / 20, this.commitLog.getMinOffsetFromFile());
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getConsumeQueueMaxOffset() {
        return this.consumeQueue.getAppendOffset() / 20;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public long getConsumeQueueCommitOffset() {
        return this.consumeQueue.getCommitOffset() / 20;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<Boolean> commitAsync() {
        return this.commitLock.drainPermits() <= 0 ? CompletableFuture.completedFuture(false) : this.commitLog.commitAsync().thenCompose(bool -> {
            return bool.booleanValue() ? this.consumeQueue.commitAsync() : CompletableFuture.completedFuture(false);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (bool2, th) -> {
            this.commitLock.release();
        });
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<ByteBuffer> getMessageAsync(long j) {
        return getConsumeQueueAsync(j).thenCompose(byteBuffer -> {
            return getCommitLogAsync(MessageFormatUtil.getCommitLogOffsetFromItem(byteBuffer), MessageFormatUtil.getSizeFromItem(byteBuffer));
        });
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<ByteBuffer> getCommitLogAsync(long j, int i) {
        return this.commitLog.readAsync(j, i);
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<ByteBuffer> getConsumeQueueAsync(long j) {
        return getConsumeQueueAsync(j, 1);
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<ByteBuffer> getConsumeQueueAsync(long j, int i) {
        return this.consumeQueue.readAsync(j * 20, i * 20);
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public CompletableFuture<Long> getQueueOffsetByTimeAsync(long j, BoundaryType boundaryType) {
        long consumeQueueMinOffset = getConsumeQueueMinOffset();
        long consumeQueueCommitOffset = getConsumeQueueCommitOffset() - 1;
        if (consumeQueueCommitOffset == -1 || consumeQueueCommitOffset < consumeQueueMinOffset) {
            return CompletableFuture.completedFuture(Long.valueOf(consumeQueueMinOffset));
        }
        if (MessageFormatUtil.getStoreTimeStamp(getMessageAsync(consumeQueueCommitOffset).join()) < j) {
            log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded maximum time, filePath={}, timestamp={}, result={}", new Object[]{this.filePath, Long.valueOf(j), Long.valueOf(consumeQueueCommitOffset + 1)});
            return CompletableFuture.completedFuture(Long.valueOf(consumeQueueCommitOffset + 1));
        }
        if (MessageFormatUtil.getStoreTimeStamp(getMessageAsync(consumeQueueMinOffset).join()) > j) {
            log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than minimum time, filePath={}, timestamp={}, result={}", new Object[]{this.filePath, Long.valueOf(j), Long.valueOf(consumeQueueMinOffset)});
            return CompletableFuture.completedFuture(Long.valueOf(consumeQueueMinOffset));
        }
        long j2 = consumeQueueMinOffset;
        long j3 = consumeQueueCommitOffset;
        ArrayList arrayList = new ArrayList();
        while (j2 < j3) {
            long j4 = j2 + ((j3 - j2) / 2);
            long storeTimeStamp = MessageFormatUtil.getStoreTimeStamp(getMessageAsync(j4).join());
            arrayList.add(String.format("(range=%d-%d, middle=%d, timestamp=%d, diff=%dms)", Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4), Long.valueOf(storeTimeStamp), Long.valueOf(j - storeTimeStamp)));
            if (storeTimeStamp < j) {
                j2 = j4 + 1;
            } else {
                j3 = j4;
            }
        }
        long j5 = j2;
        if (boundaryType == BoundaryType.UPPER) {
            while (true) {
                long j6 = j5 + 1;
                if (j6 > consumeQueueCommitOffset || MessageFormatUtil.getStoreTimeStamp(getMessageAsync(j6).join()) != j) {
                    break;
                }
                j5 = j6;
            }
        }
        log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={}, timestamp={}, result={}, log={}", new Object[]{this.filePath, Long.valueOf(j), Long.valueOf(j5), JSON.toJSONString(arrayList)});
        return CompletableFuture.completedFuture(Long.valueOf(j5));
    }

    public int hashCode() {
        return this.filePath.hashCode();
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj != null && getClass() == obj.getClass()) {
            return StringUtils.equals(this.filePath, ((FlatMessageFile) obj).filePath);
        }
        return false;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public void shutdown() {
        this.closed = true;
        this.fileLock.lock();
        try {
            this.commitLog.shutdown();
            this.consumeQueue.shutdown();
        } finally {
            this.fileLock.unlock();
        }
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public void destroyExpiredFile(long j) {
        this.fileLock.lock();
        try {
            this.commitLog.destroyExpiredFile(j);
            this.consumeQueue.destroyExpiredFile(j);
        } finally {
            this.fileLock.unlock();
        }
    }

    @Override // org.apache.rocketmq.tieredstore.file.FlatFileInterface
    public void destroy() {
        shutdown();
        this.fileLock.lock();
        try {
            this.commitLog.destroyExpiredFile(Long.MAX_VALUE);
            this.consumeQueue.destroyExpiredFile(Long.MAX_VALUE);
            if (this.queueMetadata != null) {
                this.metadataStore.deleteQueue(this.queueMetadata.getQueue());
            }
        } finally {
            this.fileLock.unlock();
        }
    }

    public long getFileReservedHours() {
        return this.topicMetadata.getReserveTime() > 0 ? this.topicMetadata.getReserveTime() : this.storeConfig.getTieredStoreFileReservedTime();
    }
}
