package org.apache.rocketmq.mqtt.ds.store;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.StoreResult;
import org.apache.rocketmq.mqtt.common.util.NamespaceUtil;
import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.class */
public class LmqQueueStoreManager implements LmqQueueStore {
    private static Logger logger = LoggerFactory.getLogger(LmqQueueStoreManager.class);
    private PullAPIWrapper pullAPIWrapper;
    private DefaultMQPullConsumer defaultMQPullConsumer;
    private DefaultMQProducer defaultMQProducer;
    private String consumerGroup = "CID_RMQ_SYS_LMQ_PULL";
    private Map<String, Set<String>> topic2Brokers = new ConcurrentHashMap();

    @Resource
    private ServiceConf serviceConf;

    @Resource
    private FirstTopicManager firstTopicManager;

    @PostConstruct
    public void init() throws MQClientException {
        this.defaultMQPullConsumer = MqFactory.buildDefaultMQPullConsumer(this.consumerGroup, this.serviceConf.getProperties());
        this.defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000L);
        this.defaultMQPullConsumer.start();
        this.pullAPIWrapper = this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
        this.defaultMQProducer = MqFactory.buildDefaultMQProducer("GID_LMQ_SEND", this.serviceConf.getProperties());
        this.defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
        this.defaultMQProducer.start();
    }

    private Message toMQMessage(org.apache.rocketmq.mqtt.common.model.Message message) {
        org.apache.rocketmq.mqtt.common.model.Message copy = message.copy();
        Message message2 = new Message(message.getFirstTopic(), copy.getPayload());
        MessageAccessor.putProperty(message2, "UNIQ_KEY", copy.getMsgId());
        message2.putUserProperty("originMqttTopic", copy.getOriginTopic());
        message2.putUserProperty("IS_EMPTY_MSG", String.valueOf(copy.isEmpty()));
        if (copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyQoS) != null) {
            message2.putUserProperty("qosLevel", copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyQoS));
        }
        if (copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyCleanSessionFlag) != null) {
            message2.putUserProperty("cleanSessionFlag", copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyCleanSessionFlag));
        }
        if (copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyClientId) != null) {
            message2.putUserProperty("clientId", NamespaceUtil.decodeOriginResource(copy.getUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyClientId)));
            copy.clearUserProperty(org.apache.rocketmq.mqtt.common.model.Message.extPropertyClientId);
        }
        message2.putUserProperty("retryTimes", String.valueOf(copy.getRetry()));
        Map userProperties = copy.getUserProperties();
        if (userProperties != null && !userProperties.isEmpty()) {
            message2.putUserProperty("extData", JSONObject.toJSONString(userProperties));
        }
        return message2;
    }

    private org.apache.rocketmq.mqtt.common.model.Message toLmqMessage(Queue queue, MessageExt messageExt) {
        org.apache.rocketmq.mqtt.common.model.Message message = new org.apache.rocketmq.mqtt.common.model.Message();
        message.setMsgId(messageExt.getMsgId());
        message.setOffset(parseLmqOffset(queue, messageExt));
        message.setEmpty(Boolean.parseBoolean(messageExt.getUserProperty("IS_EMPTY_MSG")));
        if (StringUtils.isNotBlank(messageExt.getUserProperty("originMqttTopic"))) {
            message.setOriginTopic(messageExt.getUserProperty("originMqttTopic"));
        } else if (StringUtils.isNotBlank(messageExt.getUserProperty("INNER_MULTI_DISPATCH"))) {
            for (String str : messageExt.getUserProperty("INNER_MULTI_DISPATCH").split(",")) {
                if (!TopicUtils.isWildCard(str)) {
                    message.setOriginTopic(StringUtils.replace(str.replace("%LMQ%", ""), "%", "/"));
                }
            }
        }
        message.setFirstTopic(messageExt.getTopic());
        message.setPayload(messageExt.getBody());
        message.setStoreTimestamp(messageExt.getStoreTimestamp());
        message.setBornTimestamp(messageExt.getBornTimestamp());
        if (StringUtils.isNotBlank(messageExt.getUserProperty("retryTimes"))) {
            message.setRetry(Integer.parseInt(messageExt.getUserProperty("retryTimes")));
        }
        if (StringUtils.isNotBlank(messageExt.getUserProperty("extData"))) {
            message.getUserProperties().putAll((Map) JSONObject.parseObject(messageExt.getUserProperty("extData"), new TypeReference<Map<String, String>>() { // from class: org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager.1
            }, new Feature[0]));
        }
        return message;
    }

    private long parseLmqOffset(Queue queue, MessageExt messageExt) {
        String property = messageExt.getProperty("INNER_MULTI_DISPATCH");
        if (StringUtils.isBlank(property)) {
            return messageExt.getQueueOffset();
        }
        String property2 = messageExt.getProperty("INNER_MULTI_QUEUE_OFFSET");
        if (StringUtils.isBlank(property2)) {
            return messageExt.getQueueOffset();
        }
        String[] split = property.split(",");
        String[] split2 = property2.split(",");
        for (int i = 0; i < split.length; i++) {
            if (("%LMQ%" + StringUtils.replace(queue.getQueueName(), "/", "%")).equals(split[i])) {
                return Long.parseLong(split2[i]);
            }
        }
        return messageExt.getQueueOffset();
    }

    public CompletableFuture<StoreResult> putMessage(Set<String> set, org.apache.rocketmq.mqtt.common.model.Message message) {
        final CompletableFuture<StoreResult> completableFuture = new CompletableFuture<>();
        Message mQMessage = toMQMessage(message);
        mQMessage.setTags("MQTT_COMMON");
        mQMessage.putUserProperty("INNER_MULTI_DISPATCH", StringUtils.join((Iterable) set.stream().map(str -> {
            return "%LMQ%" + StringUtils.replace(str, "/", "%");
        }).collect(Collectors.toSet()), ","));
        try {
            final long currentTimeMillis = System.currentTimeMillis();
            this.defaultMQProducer.send(mQMessage, new SendCallback() { // from class: org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager.2
                public void onSuccess(SendResult sendResult) {
                    completableFuture.complete(LmqQueueStoreManager.this.toStoreResult(sendResult));
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    StatUtil.addInvoke("lmqWrite", currentTimeMillis2);
                    LmqQueueStoreManager.this.collectLmqReadWriteMatchActionRt("lmqWrite", currentTimeMillis2, true);
                }

                public void onException(Throwable th) {
                    LmqQueueStoreManager.logger.error("", th);
                    completableFuture.completeExceptionally(th);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    StatUtil.addInvoke("lmqWrite", currentTimeMillis2, false);
                    LmqQueueStoreManager.this.collectLmqReadWriteMatchActionRt("lmqWrite", currentTimeMillis2, false);
                }
            });
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void collectLmqReadWriteMatchActionRt(String str, long j, boolean z) {
        try {
            MqttMetricsCollector.collectLmqReadWriteMatchActionRt(j, new String[]{str, String.valueOf(z)});
        } catch (PrometheusException e) {
            logger.error("", e);
        }
    }

    public CompletableFuture<PullResult> pullMessage(String str, final Queue queue, QueueOffset queueOffset, long j) {
        final CompletableFuture<PullResult> completableFuture = new CompletableFuture<>();
        try {
            MessageQueue messageQueue = new MessageQueue(str, queue.getBrokerName(), (int) queue.getQueueId());
            final long currentTimeMillis = System.currentTimeMillis();
            pull("%LMQ%" + StringUtils.replace(queue.getQueueName(), "/", "%"), messageQueue, queueOffset.getOffset(), (int) j, new PullCallback() { // from class: org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager.3
                public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
                    completableFuture.complete(LmqQueueStoreManager.this.toLmqPullResult(queue, pullResult));
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    StatUtil.addInvoke("lmqPull", currentTimeMillis2);
                    LmqQueueStoreManager.this.collectLmqReadWriteMatchActionRt("lmqPull", currentTimeMillis2, true);
                    StatUtil.addPv(pullResult.getPullStatus().name(), 1L);
                    try {
                        MqttMetricsCollector.collectPullStatusTps(1L, new String[]{pullResult.getPullStatus().name()});
                    } catch (Throwable th) {
                        LmqQueueStoreManager.logger.error("collect prometheus error", th);
                    }
                }

                public void onException(Throwable th) {
                    LmqQueueStoreManager.logger.error("", th);
                    completableFuture.completeExceptionally(th);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    StatUtil.addInvoke("lmqPull", currentTimeMillis2, false);
                    LmqQueueStoreManager.this.collectLmqReadWriteMatchActionRt("lmqPull", currentTimeMillis2, false);
                }
            });
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    public CompletableFuture<PullResult> pullLastMessages(String str, Queue queue, long j) {
        return queryQueueMaxOffset(queue).thenCompose(l -> {
            long longValue = l.longValue() - j;
            if (longValue < 0) {
                longValue = 0;
            }
            QueueOffset queueOffset = new QueueOffset();
            queueOffset.setOffset(longValue);
            return pullMessage(str, queue, queueOffset, j);
        });
    }

    public CompletableFuture<Long> queryQueueMaxOffset(Queue queue) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return Long.valueOf(maxOffset(queue));
            } catch (MQClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    public Set<String> getReadableBrokers(String str) {
        return this.firstTopicManager.getReadableBrokers(str);
    }

    public String getClientRetryTopic() {
        return this.serviceConf.getClientRetryTopic();
    }

    public String getClientP2pTopic() {
        return this.serviceConf.getClientP2pTopic();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StoreResult toStoreResult(SendResult sendResult) {
        StoreResult storeResult = new StoreResult();
        Queue queue = new Queue();
        queue.setQueueId(sendResult.getMessageQueue().getQueueId());
        queue.setBrokerName(sendResult.getMessageQueue().getBrokerName());
        storeResult.setQueue(queue);
        storeResult.setMsgId(sendResult.getMsgId());
        return storeResult;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PullResult toLmqPullResult(Queue queue, org.apache.rocketmq.client.consumer.PullResult pullResult) {
        PullResult pullResult2 = new PullResult();
        if (PullStatus.OFFSET_ILLEGAL.equals(pullResult.getPullStatus())) {
            pullResult2.setCode(302);
            QueueOffset queueOffset = new QueueOffset();
            queueOffset.setOffset(pullResult.getNextBeginOffset());
            pullResult2.setNextQueueOffset(queueOffset);
        } else {
            pullResult2.setCode(301);
        }
        List msgFoundList = pullResult.getMsgFoundList();
        if (msgFoundList != null && !msgFoundList.isEmpty()) {
            ArrayList arrayList = new ArrayList(msgFoundList.size());
            Iterator it = msgFoundList.iterator();
            while (it.hasNext()) {
                arrayList.add(toLmqMessage(queue, (MessageExt) it.next()));
            }
            pullResult2.setMessageList(arrayList);
        }
        return pullResult2;
    }

    private void pull(String str, final MessageQueue messageQueue, long j, int i, final PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
        try {
            pullKernelImpl(str, messageQueue, "*", "TAG", 0L, j, i, PullSysFlag.buildSysFlag(false, false, true, false), 0L, 5000L, 3000L, CommunicationMode.ASYNC, new PullCallback() { // from class: org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager.4
                public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
                    pullCallback.onSuccess(LmqQueueStoreManager.this.pullAPIWrapper.processPullResult(messageQueue, pullResult, new SubscriptionData()));
                }

                public void onException(Throwable th) {
                    pullCallback.onException(th);
                }
            });
        } catch (MQBrokerException e) {
            throw new MQClientException("pullAsync unknow exception", e);
        }
    }

    public org.apache.rocketmq.client.consumer.PullResult pullKernelImpl(String str, MessageQueue messageQueue, String str2, String str3, long j, long j2, int i, int i2, long j3, long j4, long j5, CommunicationMode communicationMode, PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        MQClientInstance mQClientInstance = this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
        FindBrokerResult findBrokerAddressInSubscribe = mQClientInstance.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), this.pullAPIWrapper.recalculatePullFromWhichNode(messageQueue), false);
        if (null == findBrokerAddressInSubscribe) {
            mQClientInstance.updateTopicRouteInfoFromNameServer(messageQueue.getTopic());
            findBrokerAddressInSubscribe = mQClientInstance.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), this.pullAPIWrapper.recalculatePullFromWhichNode(messageQueue), false);
        }
        if (findBrokerAddressInSubscribe == null) {
            throw new MQClientException("The broker[" + messageQueue.getBrokerName() + "] not exist", (Throwable) null);
        }
        if (!ExpressionType.isTagType(str3) && findBrokerAddressInSubscribe.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
            throw new MQClientException("The broker[" + messageQueue.getBrokerName() + ", " + findBrokerAddressInSubscribe.getBrokerVersion() + "] does not upgrade to support for filter message by " + str3, (Throwable) null);
        }
        int i3 = i2;
        if (findBrokerAddressInSubscribe.isSlave()) {
            i3 = PullSysFlag.clearCommitOffsetFlag(i3);
        }
        PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
        pullMessageRequestHeader.setConsumerGroup(this.consumerGroup);
        pullMessageRequestHeader.setTopic(str);
        pullMessageRequestHeader.setQueueId(Integer.valueOf(messageQueue.getQueueId()));
        pullMessageRequestHeader.setQueueOffset(Long.valueOf(j2));
        pullMessageRequestHeader.setMaxMsgNums(Integer.valueOf(i));
        pullMessageRequestHeader.setSysFlag(Integer.valueOf(i3));
        pullMessageRequestHeader.setCommitOffset(Long.valueOf(j3));
        pullMessageRequestHeader.setSuspendTimeoutMillis(Long.valueOf(j4));
        pullMessageRequestHeader.setSubscription(str2);
        pullMessageRequestHeader.setSubVersion(Long.valueOf(j));
        pullMessageRequestHeader.setExpressionType(str3);
        return mQClientInstance.getMQClientAPIImpl().pullMessage(findBrokerAddressInSubscribe.getBrokerAddr(), pullMessageRequestHeader, j5, communicationMode, pullCallback);
    }

    private long maxOffset(Queue queue) throws MQClientException {
        String str = "%LMQ%" + StringUtils.replace(queue.getQueueName(), "/", "%");
        MQClientInstance mQClientInstance = this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
        String findBrokerAddressInPublish = mQClientInstance.findBrokerAddressInPublish(queue.getBrokerName());
        if (null == findBrokerAddressInPublish) {
            mQClientInstance.updateTopicRouteInfoFromNameServer(queue.toFirstTopic());
            findBrokerAddressInPublish = mQClientInstance.findBrokerAddressInPublish(queue.getBrokerName());
        }
        if (findBrokerAddressInPublish == null) {
            throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", (Throwable) null);
        }
        try {
            return mQClientInstance.getMQClientAPIImpl().getMaxOffset(findBrokerAddressInPublish, str, (int) queue.getQueueId(), 3000L);
        } catch (Exception e) {
            throw new MQClientException("Invoke Broker[" + findBrokerAddressInPublish + "] exception", e);
        }
    }
}
