package org.apache.rocketmq.store.queue;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

/* loaded from: input_file:org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.class */
public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
    private static final Logger ERROR_LOG = LoggerFactory.getLogger("RocketmqStoreError");
    private static final Logger ROCKSDB_LOG = LoggerFactory.getLogger("RocketmqRocksDB");
    public static final byte CTRL_0 = 0;
    public static final byte CTRL_1 = 1;
    public static final byte CTRL_2 = 2;
    private static final int BATCH_SIZE = 16;
    public static final int MAX_KEY_LEN = 300;
    private final ScheduledExecutorService scheduledExecutorService;
    private final String storePath;
    private final ConsumeQueueRocksDBStorage rocksDBStorage;
    private final RocksDBConsumeQueueTable rocksDBConsumeQueueTable;
    private final RocksDBConsumeQueueOffsetTable rocksDBConsumeQueueOffsetTable;
    private final WriteBatch writeBatch;
    private final List<DispatchRequest> bufferDRList;
    private final List<Pair<ByteBuffer, ByteBuffer>> cqBBPairList;
    private final List<Pair<ByteBuffer, ByteBuffer>> offsetBBPairList;
    private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
    private volatile boolean isCQError;

    public RocksDBConsumeQueueStore(DefaultMessageStore defaultMessageStore) {
        super(defaultMessageStore);
        this.isCQError = false;
        this.storePath = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        this.rocksDBStorage = new ConsumeQueueRocksDBStorage(defaultMessageStore, this.storePath, 4);
        this.rocksDBConsumeQueueTable = new RocksDBConsumeQueueTable(this.rocksDBStorage, defaultMessageStore);
        this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(this.rocksDBConsumeQueueTable, this.rocksDBStorage, defaultMessageStore);
        this.writeBatch = new WriteBatch();
        this.bufferDRList = new ArrayList(16);
        this.cqBBPairList = new ArrayList(16);
        this.offsetBBPairList = new ArrayList(16);
        for (int i = 0; i < 16; i++) {
            this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
            this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
        }
        this.tempTopicQueueMaxOffsetMap = new HashMap();
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RocksDBConsumeQueueStoreScheduledThread", defaultMessageStore.getBrokerIdentity()));
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void start() {
        log.info("RocksDB ConsumeQueueStore start!");
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            this.rocksDBStorage.statRocksdb(ROCKSDB_LOG);
        }, 10L, this.messageStoreConfig.getStatRocksDBCQIntervalSec(), TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            cleanDirty(this.messageStore.getTopicConfigs().keySet());
        }, 10L, this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), TimeUnit.MINUTES);
    }

    private void cleanDirty(Set<String> set) {
        try {
            for (Map.Entry<String, Set<Integer>> entry : this.rocksDBConsumeQueueOffsetTable.iterateOffsetTable2FindDirty(set).entrySet()) {
                String key = entry.getKey();
                Iterator<Integer> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    destroy(new RocksDBConsumeQueue(key, it.next().intValue()));
                }
            }
        } catch (Exception e) {
            log.error("cleanUnusedTopic Failed.", e);
        }
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean load() {
        boolean start = this.rocksDBStorage.start();
        this.rocksDBConsumeQueueTable.load();
        this.rocksDBConsumeQueueOffsetTable.load();
        log.info("load rocksdb consume queue {}.", start ? "OK" : "Failed");
        return start;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean loadAfterDestroy() {
        return load();
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void recover() {
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean recoverConcurrently() {
        return true;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean shutdown() {
        this.scheduledExecutorService.shutdown();
        return shutdownInner();
    }

    private boolean shutdownInner() {
        return this.rocksDBStorage.shutdown();
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) throws RocksDBException {
        if (dispatchRequest == null || this.bufferDRList.size() >= 16) {
            putMessagePosition();
        }
        if (dispatchRequest != null) {
            this.bufferDRList.add(dispatchRequest);
        }
    }

    public void putMessagePosition() throws RocksDBException {
        for (int i = 0; i < 30; i++) {
            if (putMessagePosition0()) {
                if (this.isCQError) {
                    this.messageStore.getRunningFlags().clearLogicsQueueError();
                    this.isCQError = false;
                    return;
                }
                return;
            }
            ERROR_LOG.warn("{} put cq Failed. retryTime: {}", Integer.valueOf(i));
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        if (!this.isCQError) {
            ERROR_LOG.error("[BUG] put CQ Failed.");
            this.messageStore.getRunningFlags().makeLogicsQueueError();
            this.isCQError = true;
        }
        throw new RocksDBException("put CQ Failed");
    }

    private boolean putMessagePosition0() {
        if (!this.rocksDBStorage.hold()) {
            return false;
        }
        Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> map = this.tempTopicQueueMaxOffsetMap;
        try {
            try {
                List<DispatchRequest> list = this.bufferDRList;
                int size = list.size();
                if (size == 0) {
                    return true;
                }
                List<Pair<ByteBuffer, ByteBuffer>> list2 = this.cqBBPairList;
                List<Pair<ByteBuffer, ByteBuffer>> list3 = this.offsetBBPairList;
                WriteBatch writeBatch = this.writeBatch;
                long j = 0;
                for (int i = size - 1; i >= 0; i--) {
                    DispatchRequest dispatchRequest = list.get(i);
                    byte[] bytes = dispatchRequest.getTopic().getBytes(DataConverter.CHARSET_UTF8);
                    this.rocksDBConsumeQueueTable.buildAndPutCQByteBuffer(list2.get(i), bytes, dispatchRequest, writeBatch);
                    this.rocksDBConsumeQueueOffsetTable.updateTempTopicQueueMaxOffset(list3.get(i), bytes, dispatchRequest, map);
                    int msgSize = dispatchRequest.getMsgSize();
                    long commitLogOffset = dispatchRequest.getCommitLogOffset();
                    if (commitLogOffset + msgSize >= j) {
                        j = commitLogOffset + msgSize;
                    }
                }
                this.rocksDBConsumeQueueOffsetTable.putMaxPhyAndCqOffset(map, writeBatch, j);
                this.rocksDBStorage.batchPut(writeBatch);
                this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(map);
                long storeTimestamp = list.get(size - 1).getStoreTimestamp();
                if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                    this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
                notifyMessageArriveAndClear();
                map.clear();
                this.rocksDBStorage.release();
                return true;
            } catch (Exception e) {
                ERROR_LOG.error("putMessagePosition0 Failed.", e);
                map.clear();
                this.rocksDBStorage.release();
                return false;
            }
        } finally {
            map.clear();
            this.rocksDBStorage.release();
        }
    }

    private void notifyMessageArriveAndClear() {
        List<DispatchRequest> list = this.bufferDRList;
        try {
            try {
                Iterator<DispatchRequest> it = list.iterator();
                while (it.hasNext()) {
                    this.messageStore.notifyMessageArriveIfNecessary(it.next());
                }
                list.clear();
            } catch (Exception e) {
                ERROR_LOG.error("notifyMessageArriveAndClear Failed.", e);
                list.clear();
            }
        } catch (Throwable th) {
            list.clear();
            throw th;
        }
    }

    public Statistics getStatistics() {
        return this.rocksDBStorage.getStatistics();
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public List<ByteBuffer> rangeQuery(String str, int i, long j, int i2) throws RocksDBException {
        return this.rocksDBConsumeQueueTable.rangeQuery(str, i, j, i2);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public ByteBuffer get(String str, int i, long j) throws RocksDBException {
        return this.rocksDBConsumeQueueTable.getCQInKV(str, i, j);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void recoverOffsetTable(long j) {
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void destroy() {
        try {
            shutdownInner();
            FileUtils.deleteDirectory(new File(this.storePath));
        } catch (Exception e) {
            ERROR_LOG.error("destroy cq Failed. {}", this.storePath, e);
        }
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void destroy(ConsumeQueueInterface consumeQueueInterface) throws RocksDBException {
        String topic = consumeQueueInterface.getTopic();
        int queueId = consumeQueueInterface.getQueueId();
        if (StringUtils.isEmpty(topic) || queueId < 0 || !this.rocksDBStorage.hold()) {
            return;
        }
        WriteBatch writeBatch = new WriteBatch();
        try {
            try {
                this.rocksDBConsumeQueueTable.destroyCQ(topic, queueId, writeBatch);
                this.rocksDBConsumeQueueOffsetTable.destroyOffset(topic, queueId, writeBatch);
                this.rocksDBStorage.batchPut(writeBatch);
                writeBatch.close();
                this.rocksDBStorage.release();
            } catch (RocksDBException e) {
                ERROR_LOG.error("kv deleteTopic {} Failed.", topic, e);
                throw e;
            }
        } catch (Throwable th) {
            writeBatch.close();
            this.rocksDBStorage.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean flush(ConsumeQueueInterface consumeQueueInterface, int i) {
        try {
            this.rocksDBStorage.flushWAL();
            return true;
        } catch (Exception e) {
            return true;
        }
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void checkSelf() {
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public int deleteExpiredFile(ConsumeQueueInterface consumeQueueInterface, long j) {
        return 0;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void truncateDirty(long j) throws RocksDBException {
        if (j >= getMaxPhyOffsetInConsumeQueue()) {
            return;
        }
        this.rocksDBConsumeQueueOffsetTable.truncateDirty(j);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public void cleanExpired(long j) {
        this.rocksDBStorage.manualCompaction(j);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long getOffsetInQueueByTime(String str, int i, long j, BoundaryType boundaryType) throws RocksDBException {
        long minPhyOffset = this.messageStore.getMinPhyOffset();
        long minCqOffset = this.rocksDBConsumeQueueOffsetTable.getMinCqOffset(str, i);
        Long maxCqOffset = this.rocksDBConsumeQueueOffsetTable.getMaxCqOffset(str, i);
        if (maxCqOffset == null || maxCqOffset.longValue() == -1) {
            return 0L;
        }
        return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(str, i, maxCqOffset.longValue(), minCqOffset, j, minPhyOffset, boundaryType);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long getMaxOffsetInQueue(String str, int i) throws RocksDBException {
        Long maxCqOffset = this.rocksDBConsumeQueueOffsetTable.getMaxCqOffset(str, i);
        if (maxCqOffset != null) {
            return maxCqOffset.longValue() + 1;
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long getMinOffsetInQueue(String str, int i) throws RocksDBException {
        return this.rocksDBConsumeQueueOffsetTable.getMinCqOffset(str, i);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public Long getMaxPhyOffsetInConsumeQueue(String str, int i) {
        return this.rocksDBConsumeQueueOffsetTable.getMaxPhyOffset(str, i);
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long getMaxPhyOffsetInConsumeQueue() throws RocksDBException {
        return this.rocksDBConsumeQueueOffsetTable.getMaxPhyOffset();
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public ConsumeQueueInterface findOrCreateConsumeQueue(String str, int i) {
        ConcurrentMap<Integer, ConsumeQueueInterface> concurrentMap = this.consumeQueueTable.get(str);
        if (null == concurrentMap) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(128);
            ConcurrentMap<Integer, ConsumeQueueInterface> putIfAbsent = this.consumeQueueTable.putIfAbsent(str, concurrentHashMap);
            concurrentMap = putIfAbsent != null ? putIfAbsent : concurrentHashMap;
        }
        ConsumeQueueInterface consumeQueueInterface = concurrentMap.get(Integer.valueOf(i));
        if (consumeQueueInterface != null) {
            return consumeQueueInterface;
        }
        RocksDBConsumeQueue rocksDBConsumeQueue = new RocksDBConsumeQueue(this.messageStore, str, i);
        ConsumeQueueInterface putIfAbsent2 = concurrentMap.putIfAbsent(Integer.valueOf(i), rocksDBConsumeQueue);
        return putIfAbsent2 != null ? putIfAbsent2 : rocksDBConsumeQueue;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long rollNextFile(ConsumeQueueInterface consumeQueueInterface, long j) {
        return 0L;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean isFirstFileExist(ConsumeQueueInterface consumeQueueInterface) {
        return true;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueueInterface) {
        return true;
    }

    @Override // org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface
    public long getTotalSize() {
        return 0L;
    }
}
