package org.apache.rocketmq.proxy.service.transaction;

import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;

/* loaded from: input_file:org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.class */
public class ClusterTransactionService extends AbstractTransactionService {
    private static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    private static final String TRANS_HEARTBEAT_CLIENT_ID = "rmq-proxy-producer-client";
    private final MQClientAPIFactory mqClientAPIFactory;
    private final TopicRouteService topicRouteService;
    private final ProducerManager producerManager;
    private ThreadPoolExecutor heartbeatExecutors;
    private final Map<String, Set<ClusterData>> groupClusterData = new ConcurrentHashMap();
    private final AtomicReference<Map<String, String>> brokerAddrNameMapRef = new AtomicReference<>();
    private TxHeartbeatServiceThread txHeartbeatServiceThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService$ClusterData.class */
    public static class ClusterData {
        private final String cluster;

        public ClusterData(String str) {
            this.cluster = str;
        }

        public String getCluster() {
            return this.cluster;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            return !(obj instanceof ClusterData) ? super.equals(obj) : this.cluster.equals(((ClusterData) obj).cluster);
        }

        public int hashCode() {
            return this.cluster.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService$TxHeartbeatServiceThread.class */
    class TxHeartbeatServiceThread extends ServiceThread {
        TxHeartbeatServiceThread() {
        }

        public String getServiceName() {
            return TxHeartbeatServiceThread.class.getName();
        }

        public void run() {
            while (!isStopped()) {
                waitForRunning(TimeUnit.SECONDS.toMillis(ConfigurationManager.getProxyConfig().getTransactionHeartbeatPeriodSecond()));
            }
        }

        protected void onWaitEnd() {
            ClusterTransactionService.this.scanProducerHeartBeat();
        }
    }

    public ClusterTransactionService(TopicRouteService topicRouteService, ProducerManager producerManager, MQClientAPIFactory mQClientAPIFactory) {
        this.topicRouteService = topicRouteService;
        this.producerManager = producerManager;
        this.mqClientAPIFactory = mQClientAPIFactory;
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.TransactionService
    public void addTransactionSubscription(String str, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            addTransactionSubscription(str, it.next());
        }
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.TransactionService
    public void addTransactionSubscription(String str, String str2) {
        try {
            this.groupClusterData.compute(str, (str3, set) -> {
                if (set == null) {
                    set = Sets.newHashSet();
                }
                set.addAll(getClusterDataFromTopic(str2));
                return set;
            });
        } catch (Exception e) {
            log.error("add producer group err in txHeartBeat. groupId: {}, err: {}", str, e);
        }
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.TransactionService
    public void replaceTransactionSubscription(String str, List<String> list) {
        HashSet hashSet = new HashSet();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            hashSet.addAll(getClusterDataFromTopic(it.next()));
        }
        this.groupClusterData.put(str, hashSet);
    }

    private Set<ClusterData> getClusterDataFromTopic(String str) {
        try {
            List brokerDatas = this.topicRouteService.getAllMessageQueueView(str).getTopicRouteData().getBrokerDatas();
            if (brokerDatas == null) {
                return Collections.emptySet();
            }
            HashSet newHashSet = Sets.newHashSet();
            Iterator it = brokerDatas.iterator();
            while (it.hasNext()) {
                newHashSet.add(new ClusterData(((BrokerData) it.next()).getCluster()));
            }
            return newHashSet;
        } catch (Throwable th) {
            log.error("get cluster data failed in txHeartBeat. topic: {}, err: {}", str, th);
            return Collections.emptySet();
        }
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.TransactionService
    public void unSubscribeAllTransactionTopic(String str) {
        this.groupClusterData.remove(str);
    }

    public void scanProducerHeartBeat() {
        Set<String> keySet = this.groupClusterData.keySet();
        HashMap hashMap = new HashMap();
        Iterator<String> it = keySet.iterator();
        while (it.hasNext()) {
            this.groupClusterData.computeIfPresent(it.next(), (str, set) -> {
                HeartbeatData heartbeatData;
                if (set.isEmpty() || !this.producerManager.groupOnline(str)) {
                    return null;
                }
                ProducerData producerData = new ProducerData();
                producerData.setGroupName(str);
                Iterator it2 = set.iterator();
                while (it2.hasNext()) {
                    ClusterData clusterData = (ClusterData) it2.next();
                    List list = (List) hashMap.get(clusterData.cluster);
                    if (list == null) {
                        list = new ArrayList();
                    }
                    if (list.isEmpty()) {
                        heartbeatData = new HeartbeatData();
                        heartbeatData.setClientID(TRANS_HEARTBEAT_CLIENT_ID);
                        list.add(heartbeatData);
                    } else {
                        heartbeatData = (HeartbeatData) list.get(list.size() - 1);
                        if (heartbeatData.getProducerDataSet().size() >= ConfigurationManager.getProxyConfig().getTransactionHeartbeatBatchNum()) {
                            heartbeatData = new HeartbeatData();
                            heartbeatData.setClientID(TRANS_HEARTBEAT_CLIENT_ID);
                            list.add(heartbeatData);
                        }
                    }
                    heartbeatData.getProducerDataSet().add(producerData);
                    hashMap.put(clusterData.cluster, list);
                }
                if (set.isEmpty()) {
                    return null;
                }
                return set;
            });
        }
        if (hashMap.isEmpty()) {
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            sendHeartBeatToCluster((String) entry.getKey(), (List<HeartbeatData>) entry.getValue(), concurrentHashMap);
        }
        this.brokerAddrNameMapRef.set(concurrentHashMap);
    }

    public Map<String, Set<ClusterData>> getGroupClusterData() {
        return this.groupClusterData;
    }

    protected void sendHeartBeatToCluster(String str, List<HeartbeatData> list, Map<String, String> map) {
        if (list == null) {
            return;
        }
        Iterator<HeartbeatData> it = list.iterator();
        while (it.hasNext()) {
            sendHeartBeatToCluster(str, it.next(), map);
        }
        this.brokerAddrNameMapRef.set(map);
    }

    protected void sendHeartBeatToCluster(String str, HeartbeatData heartbeatData, Map<String, String> map) {
        try {
            List<BrokerData> brokerDatas = this.topicRouteService.getAllMessageQueueView(str).getTopicRouteData().getBrokerDatas();
            if (brokerDatas == null) {
                return;
            }
            for (BrokerData brokerData : brokerDatas) {
                map.put(brokerData.selectBrokerAddr(), brokerData.getBrokerName());
                this.heartbeatExecutors.submit(() -> {
                    String selectBrokerAddr = brokerData.selectBrokerAddr();
                    this.mqClientAPIFactory.getClient().sendHeartbeatOneway(selectBrokerAddr, heartbeatData, Duration.ofSeconds(3L).toMillis()).exceptionally(th -> {
                        log.error("Send transactionHeartbeat to broker err. brokerAddr: {}", selectBrokerAddr, th);
                        return null;
                    });
                });
            }
        } catch (Exception e) {
            log.error("get broker add in cluster failed in tx. clusterName: {}", str, e);
        }
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.AbstractTransactionService
    protected String getBrokerNameByAddr(String str) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        return this.brokerAddrNameMapRef.get().get(str);
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.AbstractTransactionService
    public void start() throws Exception {
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.txHeartbeatServiceThread = new TxHeartbeatServiceThread();
        super.start();
        this.txHeartbeatServiceThread.start();
        this.heartbeatExecutors = ThreadPoolMonitor.createAndMonitor(proxyConfig.getTransactionHeartbeatThreadPoolNums(), proxyConfig.getTransactionHeartbeatThreadPoolNums(), 0L, TimeUnit.MILLISECONDS, "TransactionHeartbeatRegisterThread", proxyConfig.getTransactionHeartbeatThreadPoolQueueCapacity());
    }

    @Override // org.apache.rocketmq.proxy.service.transaction.AbstractTransactionService
    public void shutdown() throws Exception {
        this.txHeartbeatServiceThread.shutdown();
        this.heartbeatExecutors.shutdown();
        super.shutdown();
    }
}
