package org.apache.rocketmq.mqtt.ds.notify;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.common.model.MessageEvent;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.meta.TopicNotExistException;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/ds/notify/NotifyManager.class */
public class NotifyManager {
    private static Logger logger = LoggerFactory.getLogger(NotifyManager.class);
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private ScheduledThreadPoolExecutor scheduler;
    private static final int NODE_FAIL_MAX_NUM = 3;
    private NettyRemotingClient remotingClient;
    private DefaultMQProducer defaultMQProducer;

    @Resource
    private ServiceConf serviceConf;

    @Resource
    private MetaPersistManager metaPersistManager;

    @Resource
    private FirstTopicManager firstTopicManager;
    private String dispatcherConsumerGroup = "CID_RMQ_SYS_mqtt_event";
    private Set<String> topics = new HashSet();
    private Map<String, AtomicInteger> nodeFail = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/rocketmq/mqtt/ds/notify/NotifyManager$Dispatcher.class */
    class Dispatcher implements MessageListenerConcurrently {
        Dispatcher() {
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            try {
                HashSet hashSet = new HashSet();
                for (MessageExt messageExt : list) {
                    MessageEvent messageEvent = new MessageEvent();
                    messageEvent.setBrokerName(consumeConcurrentlyContext.getMessageQueue().getBrokerName());
                    NotifyManager.this.setPubTopic(messageEvent, messageExt);
                    messageEvent.setNamespace(messageExt.getUserProperty("namespace"));
                    hashSet.add(messageEvent);
                }
                NotifyManager.this.notifyMessage(hashSet);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                NotifyManager.logger.error("", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
    }

    @PostConstruct
    public void init() throws MQClientException {
        this.defaultMQPushConsumer = MqFactory.buildDefaultMQPushConsumer(this.dispatcherConsumerGroup, this.serviceConf.getProperties(), new Dispatcher());
        this.defaultMQPushConsumer.setPullInterval(1L);
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(64);
        this.defaultMQPushConsumer.setPullBatchSize(32);
        this.defaultMQPushConsumer.setConsumeThreadMin(32);
        this.defaultMQPushConsumer.setConsumeThreadMax(64);
        this.defaultMQProducer = MqFactory.buildDefaultMQProducer("CID_RMQ_SYS_NotifyRetrySend", this.serviceConf.getProperties());
        try {
            this.defaultMQPushConsumer.start();
            this.defaultMQProducer.start();
        } catch (Exception e) {
            logger.error("", e);
        }
        this.scheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("Refresh_Notify_Topic_"));
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                refresh();
            } catch (Exception e2) {
                logger.error("", e2);
            }
        }, 0L, 5L, TimeUnit.SECONDS);
        this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
        this.remotingClient.start();
    }

    private void refresh() throws MQClientException {
        Set<String> allFirstTopics = this.metaPersistManager.getAllFirstTopics();
        if (allFirstTopics == null || allFirstTopics.isEmpty()) {
            return;
        }
        HashSet hashSet = new HashSet();
        for (String str : allFirstTopics) {
            try {
                if (!str.equals(this.serviceConf.getClientRetryTopic())) {
                    this.firstTopicManager.checkFirstTopicIfCreated(str);
                    hashSet.add(str);
                    if (!this.topics.contains(str)) {
                        subscribe(str);
                        this.topics.add(str);
                    }
                }
            } catch (TopicNotExistException e) {
                logger.error("", e);
            }
        }
        Iterator<String> it = this.topics.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (!hashSet.contains(next)) {
                it.remove();
                unsubscribe(next);
            }
        }
    }

    private void subscribe(String str) throws MQClientException {
        this.defaultMQPushConsumer.subscribe(str, "*");
        logger.warn("subscribe:{}", str);
    }

    private void unsubscribe(String str) {
        try {
            logger.warn("unsubscribe:{}", str);
            this.defaultMQPushConsumer.unsubscribe(str);
            this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().getTopicSubscribeInfoTable().remove(str);
            this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().getDefaultMQProducerImpl().getTopicPublishInfoTable().remove(str);
        } catch (Exception e) {
            logger.error("{}", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setPubTopic(MessageEvent messageEvent, MessageExt messageExt) {
        if (StringUtils.isNotBlank(messageExt.getUserProperty("originMqttTopic"))) {
            messageEvent.setPubTopic(messageExt.getUserProperty("originMqttTopic"));
            return;
        }
        if (StringUtils.isNotBlank(messageExt.getUserProperty("INNER_MULTI_DISPATCH"))) {
            for (String str : messageExt.getUserProperty("INNER_MULTI_DISPATCH").split(",")) {
                if (!TopicUtils.isWildCard(str) && str.contains("%LMQ%")) {
                    messageEvent.setPubTopic(StringUtils.replace(str.replace("%LMQ%", ""), "%", "/"));
                }
            }
        }
    }

    public void notifyMessage(Set<MessageEvent> set) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        Set<String> connectNodeSet = this.metaPersistManager.getConnectNodeSet();
        if (connectNodeSet == null || connectNodeSet.isEmpty()) {
            throw new RemotingException("No Connect Nodes");
        }
        for (String str : connectNodeSet) {
            try {
                try {
                    AtomicInteger atomicInteger = this.nodeFail.get(str);
                    if (atomicInteger == null) {
                        atomicInteger = new AtomicInteger();
                        AtomicInteger putIfAbsent = this.nodeFail.putIfAbsent(str, atomicInteger);
                        if (putIfAbsent != null) {
                            atomicInteger = putIfAbsent;
                        }
                    }
                    if (atomicInteger.get() > NODE_FAIL_MAX_NUM) {
                        sendEventRetryMsg(set, 1, str, 0);
                        if (0 == 0) {
                            sendEventRetryMsg(set, 1, str, 0);
                        }
                    } else {
                        boolean doNotify = doNotify(str, set);
                        if (doNotify) {
                            atomicInteger.set(0);
                            if (!doNotify) {
                                sendEventRetryMsg(set, 1, str, 0);
                            }
                        } else {
                            atomicInteger.incrementAndGet();
                            if (!doNotify) {
                                sendEventRetryMsg(set, 1, str, 0);
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error("", e);
                    if (0 == 0) {
                        sendEventRetryMsg(set, 1, str, 0);
                    }
                }
            } catch (Throwable th) {
                if (0 == 0) {
                    sendEventRetryMsg(set, 1, str, 0);
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean doNotify(String str, Set<MessageEvent> set) {
        Set connectNodeSet = this.metaPersistManager.getConnectNodeSet();
        if (connectNodeSet == null || connectNodeSet.isEmpty()) {
            return false;
        }
        if (!connectNodeSet.contains(str)) {
            return true;
        }
        try {
            return this.remotingClient.invokeSync(new StringBuilder().append(str).append(":").append(this.serviceConf.getCsRpcPort()).toString(), createMsgEventCommand(set), 1000L).getCode() == 1;
        } catch (Exception e) {
            logger.error("fail notify {}", str, e);
            return false;
        }
    }

    private RemotingCommand createMsgEventCommand(Set<MessageEvent> set) {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(201, (CommandCustomHeader) null);
        createRequestCommand.setBody(JSON.toJSONBytes(set, new SerializerFeature[0]));
        return createRequestCommand;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendEventRetryMsg(Set<MessageEvent> set, int i, String str, int i2) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        if (i2 >= this.serviceConf.getEventNotifyRetryMaxTime()) {
            return;
        }
        Message message = new Message();
        message.setTopic(this.serviceConf.getEventNotifyRetryTopic());
        message.setBody(JSON.toJSONBytes(set, new SerializerFeature[0]));
        message.setDelayTimeLevel(i);
        message.putUserProperty("retryNode", str);
        message.putUserProperty("retryTime", String.valueOf(i2 + 1));
        this.defaultMQProducer.send(message);
    }
}
