package org.apache.rocketmq.proxy.service.metadata;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.route.TopicRouteHelper;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

/* loaded from: input_file:org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.class */
public class ClusterMetadataService extends AbstractStartAndShutdown implements MetadataService {
    private static final long DEFAULT_TIMEOUT = 3000;
    private final TopicRouteService topicRouteService;
    private final MQClientAPIFactory mqClientAPIFactory;
    protected final ThreadPoolExecutor cacheRefreshExecutor;
    protected final LoadingCache<String, TopicConfigAndQueueMapping> topicConfigCache;
    protected final LoadingCache<String, SubscriptionGroupConfig> subscriptionGroupConfigCache;
    protected static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    protected static final TopicConfigAndQueueMapping EMPTY_TOPIC_CONFIG = new TopicConfigAndQueueMapping();
    protected static final SubscriptionGroupConfig EMPTY_SUBSCRIPTION_GROUP_CONFIG = new SubscriptionGroupConfig();

    /* loaded from: input_file:org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService$ClusterSubscriptionGroupConfigCacheLoader.class */
    protected class ClusterSubscriptionGroupConfigCacheLoader extends AbstractCacheLoader<String, SubscriptionGroupConfig> {
        public ClusterSubscriptionGroupConfigCacheLoader() {
            super(ClusterMetadataService.this.cacheRefreshExecutor);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.rocketmq.proxy.common.AbstractCacheLoader
        public SubscriptionGroupConfig getDirectly(String str) throws Exception {
            Optional<BrokerData> findOneBroker = ClusterMetadataService.this.findOneBroker(ConfigurationManager.getProxyConfig().getRocketMQClusterName());
            if (!findOneBroker.isPresent()) {
                return ClusterMetadataService.EMPTY_SUBSCRIPTION_GROUP_CONFIG;
            }
            return ClusterMetadataService.this.mqClientAPIFactory.getClient().getSubscriptionGroupConfig(findOneBroker.get().selectBrokerAddr(), str, ClusterMetadataService.DEFAULT_TIMEOUT);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.rocketmq.proxy.common.AbstractCacheLoader
        public void onErr(String str, Exception exc) {
            ClusterMetadataService.log.error("load subscription config failed. consumerGroup:{}", str, exc);
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService$ClusterTopicConfigCacheLoader.class */
    protected class ClusterTopicConfigCacheLoader extends AbstractCacheLoader<String, TopicConfigAndQueueMapping> {
        public ClusterTopicConfigCacheLoader() {
            super(ClusterMetadataService.this.cacheRefreshExecutor);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.rocketmq.proxy.common.AbstractCacheLoader
        public TopicConfigAndQueueMapping getDirectly(String str) throws Exception {
            Optional<BrokerData> findOneBroker = ClusterMetadataService.this.findOneBroker(str);
            if (!findOneBroker.isPresent()) {
                return ClusterMetadataService.EMPTY_TOPIC_CONFIG;
            }
            return ClusterMetadataService.this.mqClientAPIFactory.getClient().getTopicConfig(findOneBroker.get().selectBrokerAddr(), str, ClusterMetadataService.DEFAULT_TIMEOUT);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.rocketmq.proxy.common.AbstractCacheLoader
        public void onErr(String str, Exception exc) {
            ClusterMetadataService.log.error("load topic config failed. topic:{}", str, exc);
        }
    }

    public ClusterMetadataService(TopicRouteService topicRouteService, MQClientAPIFactory mQClientAPIFactory) {
        this.topicRouteService = topicRouteService;
        this.mqClientAPIFactory = mQClientAPIFactory;
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getMetadataThreadPoolNums(), proxyConfig.getMetadataThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, "MetadataCacheRefresh", proxyConfig.getMetadataThreadPoolQueueCapacity());
        this.topicConfigCache = CacheBuilder.newBuilder().maximumSize(proxyConfig.getTopicConfigCacheMaxNum()).refreshAfterWrite(proxyConfig.getTopicConfigCacheExpiredInSeconds(), TimeUnit.SECONDS).build(new ClusterTopicConfigCacheLoader());
        this.subscriptionGroupConfigCache = CacheBuilder.newBuilder().maximumSize(proxyConfig.getSubscriptionGroupConfigCacheMaxNum()).refreshAfterWrite(proxyConfig.getSubscriptionGroupConfigCacheExpiredInSeconds(), TimeUnit.SECONDS).build(new ClusterSubscriptionGroupConfigCacheLoader());
        init();
    }

    protected void init() {
        ThreadPoolExecutor threadPoolExecutor = this.cacheRefreshExecutor;
        threadPoolExecutor.getClass();
        appendShutdown(threadPoolExecutor::shutdown);
    }

    @Override // org.apache.rocketmq.proxy.service.metadata.MetadataService
    public TopicMessageType getTopicMessageType(ProxyContext proxyContext, String str) {
        try {
            TopicConfigAndQueueMapping topicConfigAndQueueMapping = (TopicConfigAndQueueMapping) this.topicConfigCache.get(str);
            return topicConfigAndQueueMapping.equals(EMPTY_TOPIC_CONFIG) ? TopicMessageType.UNSPECIFIED : topicConfigAndQueueMapping.getTopicMessageType();
        } catch (Exception e) {
            return TopicMessageType.UNSPECIFIED;
        }
    }

    @Override // org.apache.rocketmq.proxy.service.metadata.MetadataService
    public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext proxyContext, String str) {
        try {
            SubscriptionGroupConfig subscriptionGroupConfig = (SubscriptionGroupConfig) this.subscriptionGroupConfigCache.get(str);
            if (subscriptionGroupConfig == EMPTY_SUBSCRIPTION_GROUP_CONFIG) {
                return null;
            }
            return subscriptionGroupConfig;
        } catch (Exception e) {
            return null;
        }
    }

    protected Optional<BrokerData> findOneBroker(String str) throws Exception {
        try {
            return this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(getClass()), str).getTopicRouteData().getBrokerDatas().stream().findAny();
        } catch (Exception e) {
            if (TopicRouteHelper.isTopicNotExistError(e)) {
                return Optional.empty();
            }
            throw e;
        }
    }
}
