package org.apache.rocketmq.mqtt.ds.meta;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.class */
public class FirstTopicManager {
    private static Logger logger = LoggerFactory.getLogger(FirstTopicManager.class);
    private Cache<String, TopicRouteData> topicExistCache;
    private Cache<String, Object> topicNotExistCache;
    private DefaultMQAdminExt defaultMQAdminExt;
    private Map<String, Map<String, String>> brokerAddressMap = new ConcurrentHashMap();
    private Map<String, Set<String>> readableBrokers = new ConcurrentHashMap();
    private ScheduledThreadPoolExecutor scheduler;

    @Resource
    private ServiceConf serviceConf;

    @Resource
    private MetaPersistManager metaPersistManager;

    @PostConstruct
    public void init() throws MQClientException {
        this.topicExistCache = Caffeine.newBuilder().maximumSize(1000L).expireAfterWrite(1L, TimeUnit.MINUTES).build();
        this.topicNotExistCache = Caffeine.newBuilder().maximumSize(1000L).expireAfterWrite(1L, TimeUnit.MINUTES).build();
        initMQAdminExt();
        this.scheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("refreshStoreBroker"));
        this.scheduler.scheduleWithFixedDelay(() -> {
            HashSet hashSet = new HashSet();
            hashSet.add(this.serviceConf.getClientRetryTopic());
            hashSet.add(this.serviceConf.getClientP2pTopic());
            Set allFirstTopics = this.metaPersistManager.getAllFirstTopics();
            if (allFirstTopics != null) {
                hashSet.addAll(allFirstTopics);
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                updateTopicRoute((String) it.next());
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    public void initMQAdminExt() throws MQClientException {
        this.defaultMQAdminExt = MqFactory.buildDefaultMQAdminExt("TopicCheck", this.serviceConf.getProperties());
        this.defaultMQAdminExt.start();
    }

    public void checkFirstTopicIfCreated(String str) {
        if (this.topicExistCache.getIfPresent(str) != null) {
            return;
        }
        if (this.topicNotExistCache.getIfPresent(str) != null) {
            throw new TopicNotExistException(str + " NotExist");
        }
        try {
            TopicRouteData examineTopicRouteInfo = this.defaultMQAdminExt.examineTopicRouteInfo(str);
            if (examineTopicRouteInfo == null || examineTopicRouteInfo.getBrokerDatas() == null || examineTopicRouteInfo.getBrokerDatas().isEmpty()) {
                this.topicNotExistCache.put(str, new Object());
                throw new TopicNotExistException(str + " NotExist");
            }
            updateTopicRoute(str, examineTopicRouteInfo);
            this.topicExistCache.put(str, examineTopicRouteInfo);
        } catch (Exception e) {
            logger.error("check topic {} exception", str, e);
        } catch (MQClientException e2) {
            if (17 == e2.getResponseCode()) {
                this.topicNotExistCache.put(str, new Object());
                throw new TopicNotExistException(str + " NotExist");
            }
        }
    }

    private void updateTopicRoute(String str) {
        if (StringUtils.isBlank(str)) {
            return;
        }
        try {
            updateTopicRoute(str, this.defaultMQAdminExt.examineTopicRouteInfo(str));
        } catch (MQClientException e) {
            if (e.getResponseCode() == 17) {
                this.brokerAddressMap.remove(str);
                this.readableBrokers.remove(str);
            }
        } catch (Throwable th) {
            logger.error("", th);
        }
    }

    private void updateTopicRoute(String str, TopicRouteData topicRouteData) {
        if (topicRouteData == null || str == null) {
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            concurrentHashMap.put(brokerData.getBrokerName(), brokerData.getBrokerAddrs().get(0L));
        }
        this.brokerAddressMap.put(str, concurrentHashMap);
        HashSet hashSet = new HashSet();
        for (QueueData queueData : topicRouteData.getQueueDatas()) {
            if (PermName.isReadable(queueData.getPerm())) {
                hashSet.add(queueData.getBrokerName());
            }
        }
        this.readableBrokers.put(str, hashSet);
    }

    public Map<String, String> getBrokerAddressMap(String str) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Map<String, String> map = this.brokerAddressMap.get(str);
        if (map != null) {
            concurrentHashMap.putAll(map);
        }
        return concurrentHashMap;
    }

    public Set<String> getReadableBrokers(String str) {
        HashSet hashSet = new HashSet();
        Set<String> set = this.readableBrokers.get(str);
        if (set != null) {
            hashSet.addAll(set);
        }
        return hashSet;
    }
}
