package org.apache.rocketmq.mqtt.meta.raft.processor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.mqtt.common.model.Trie;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.class */
public class RetainedMsgStateProcessor extends StateProcessor {
    private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcessor.class);
    private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>();
    private MqttRaftServer server;
    private int maxRetainedTopicNum;

    public RetainedMsgStateProcessor(MqttRaftServer mqttRaftServer, int i) {
        this.server = mqttRaftServer;
        this.maxRetainedTopicNum = i;
    }

    @Override // org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor
    public Response onReadRequest(ReadRequest readRequest) {
        try {
            MqttStateMachine mqttStateMachine = this.server.getMqttStateMachine(readRequest.getGroup());
            if (mqttStateMachine == null) {
                logger.error("Fail to process RetainedMsg ReadRequest , Not Found SM for {}", readRequest.getGroup());
                return null;
            }
            String str = (String) readRequest.getExtDataMap().get("topic");
            String str2 = (String) readRequest.getExtDataMap().get("firstTopic");
            String operation = readRequest.getOperation();
            logger.info("FirstTopic:{} Topic:{} Operation:{}", new Object[]{str2, str, operation});
            if (operation.equals("topic")) {
                return get(mqttStateMachine.getRocksDBEngine(), str.getBytes(StandardCharsets.UTF_8));
            }
            String wrapTrieFirstTopic = wrapTrieFirstTopic(str2);
            if (!this.retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
                Trie<String, String> trie = new Trie<>();
                byte[] rdb = getRdb(mqttStateMachine.getRocksDBEngine(), wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8));
                if (rdb != null) {
                    trie = (Trie) JSON.parseObject(new String(rdb, StandardCharsets.UTF_8), Trie.class);
                }
                this.retainedMsgTopicTrie.put(wrapTrieFirstTopic, trie);
                return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(JSON.toJSONBytes(new ArrayList(), new SerializerFeature[0]))).build();
            }
            Set allPath = this.retainedMsgTopicTrie.get(wrapTrieFirstTopic).getAllPath(str);
            ArrayList arrayList = new ArrayList();
            Iterator it = allPath.iterator();
            while (it.hasNext()) {
                byte[] rdb2 = getRdb(mqttStateMachine.getRocksDBEngine(), ((String) it.next()).getBytes(StandardCharsets.UTF_8));
                if (rdb2 != null) {
                    arrayList.add(ByteString.copyFrom(rdb2));
                }
            }
            return Response.newBuilder().setSuccess(true).addAllDatalist(arrayList).build();
        } catch (Exception e) {
            logger.error("", e);
            return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
        }
    }

    boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String str, String str2, boolean z, byte[] bArr) throws Exception {
        String wrapTrieFirstTopic = wrapTrieFirstTopic(str);
        if (!this.retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
            this.retainedMsgTopicTrie.put(wrapTrieFirstTopic, new Trie<>());
        }
        if (z) {
            logger.info("Delete the topic {} retained message", str2);
            delete(rocksDBEngine, str2.getBytes(StandardCharsets.UTF_8));
            Trie<String, String> trie = this.retainedMsgTopicTrie.get(wrapTrieFirstTopic);
            if (trie != null) {
                trie.deleteTrieNode(str2, "");
            }
            put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie, new SerializerFeature[0]));
            return true;
        }
        Trie<String, String> trie2 = this.retainedMsgTopicTrie.get(wrapTrieFirstTopic);
        if (trie2.getNodePath().size() >= this.maxRetainedTopicNum) {
            return false;
        }
        put(rocksDBEngine, str2.getBytes(StandardCharsets.UTF_8), bArr);
        trie2.addNode(str2, "", "");
        put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie2, new SerializerFeature[0]));
        return true;
    }

    private String wrapTrieFirstTopic(String str) {
        return "$" + str + "$";
    }

    @Override // org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor
    public Response onWriteRequest(WriteRequest writeRequest) {
        try {
            MqttStateMachine mqttStateMachine = this.server.getMqttStateMachine(writeRequest.getGroup());
            if (mqttStateMachine == null) {
                logger.error("Fail to process RetainedMsg WriteRequest , Not Found SM for {}", writeRequest.getGroup());
                return null;
            }
            String normalizeTopic = TopicUtils.normalizeTopic((String) writeRequest.getExtDataMap().get("firstTopic"));
            String normalizeTopic2 = TopicUtils.normalizeTopic((String) writeRequest.getExtDataMap().get("topic"));
            if (setRetainedMsg(mqttStateMachine.getRocksDBEngine(), normalizeTopic, normalizeTopic2, Boolean.parseBoolean((String) writeRequest.getExtDataMap().get("isEmpty")), writeRequest.getData().toByteArray())) {
                logger.info("Put the topic {} retained message success!", normalizeTopic2);
                return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(JSON.toJSONBytes(normalizeTopic2, new SerializerFeature[0]))).build();
            }
            logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.", normalizeTopic2);
            return Response.newBuilder().setSuccess(false).setErrMsg("Exceeded maximum number of reserved topics limit.").build();
        } catch (Exception e) {
            logger.error("Put the retained message error!", e);
            return Response.newBuilder().setSuccess(false).setErrMsg(e.getMessage()).build();
        }
    }

    @Override // org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor
    public String groupCategory() {
        return "retainedMsg";
    }
}
