package org.apache.rocketmq.proxy.service.sysmessage;

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;

/* loaded from: input_file:org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.class */
public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
    protected ThreadPoolExecutor threadPoolExecutor;
    protected ConsumerManager consumerManager;
    protected final Map<String, RemoteChannel> remoteChannelMap;
    protected String localProxyId;

    public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService, ConsumerManager consumerManager, MQClientAPIFactory mQClientAPIFactory) {
        super(topicRouteService, adminService, mQClientAPIFactory);
        this.remoteChannelMap = new ConcurrentHashMap();
        this.consumerManager = consumerManager;
        this.localProxyId = buildLocalProxyId();
        init();
    }

    protected void init() {
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getHeartbeatSyncerThreadPoolNums(), proxyConfig.getHeartbeatSyncerThreadPoolNums(), 1L, TimeUnit.MINUTES, "HeartbeatSyncer", proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity());
        this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { // from class: org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer.1
            public void handle(ConsumerGroupEvent consumerGroupEvent, String str, Object... objArr) {
                if (consumerGroupEvent != ConsumerGroupEvent.CLIENT_UNREGISTER || objArr == null || objArr.length < 1 || !(objArr[0] instanceof ClientChannelInfo)) {
                    return;
                }
                HeartbeatSyncer.this.remoteChannelMap.remove(((ClientChannelInfo) objArr[0]).getChannel().id().asLongText());
            }

            public void shutdown() {
            }
        });
    }

    @Override // org.apache.rocketmq.proxy.service.sysmessage.AbstractSystemMessageSyncer
    public void shutdown() throws Exception {
        this.threadPoolExecutor.shutdown();
        super.shutdown();
    }

    public void onConsumerRegister(String str, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> set) {
        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
            return;
        }
        try {
            this.threadPoolExecutor.submit(() -> {
                try {
                    RemoteChannel create = RemoteChannel.create(clientChannelInfo.getChannel());
                    if (create == null) {
                        return;
                    }
                    HeartbeatSyncerData heartbeatSyncerData = new HeartbeatSyncerData(HeartbeatType.REGISTER, clientChannelInfo.getClientId(), clientChannelInfo.getLanguage(), clientChannelInfo.getVersion(), str, consumeType, messageModel, consumeFromWhere, this.localProxyId, create.encode());
                    heartbeatSyncerData.setSubscriptionDataSet(set);
                    log.debug("sync register heart beat. topic:{}, data:{}", getBroadcastTopicName(), heartbeatSyncerData);
                    sendSystemMessage(heartbeatSyncerData);
                } catch (Throwable th) {
                    log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}", new Object[]{str, clientChannelInfo, consumeType, messageModel, consumeFromWhere, set, th});
                }
            });
        } catch (Throwable th) {
            log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}", new Object[]{str, clientChannelInfo, consumeType, messageModel, consumeFromWhere, set, th});
        }
    }

    public void onConsumerUnRegister(String str, ClientChannelInfo clientChannelInfo) {
        if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
            return;
        }
        try {
            this.threadPoolExecutor.submit(() -> {
                try {
                    RemoteChannel create = RemoteChannel.create(clientChannelInfo.getChannel());
                    if (create == null) {
                        return;
                    }
                    HeartbeatSyncerData heartbeatSyncerData = new HeartbeatSyncerData(HeartbeatType.UNREGISTER, clientChannelInfo.getClientId(), clientChannelInfo.getLanguage(), clientChannelInfo.getVersion(), str, null, null, null, this.localProxyId, create.encode());
                    log.debug("sync unregister heart beat. topic:{}, data:{}", getBroadcastTopicName(), heartbeatSyncerData);
                    sendSystemMessage(heartbeatSyncerData);
                } catch (Throwable th) {
                    log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}", new Object[]{str, clientChannelInfo, th});
                }
            });
        } catch (Throwable th) {
            log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}", new Object[]{str, clientChannelInfo, th});
        }
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list == null || list.isEmpty()) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt messageExt : list) {
            try {
                HeartbeatSyncerData heartbeatSyncerData = (HeartbeatSyncerData) JSON.parseObject(new String(messageExt.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
                if (!heartbeatSyncerData.getLocalProxyId().equals(this.localProxyId)) {
                    RemoteChannel decode = RemoteChannel.decode(heartbeatSyncerData.getChannelData());
                    RemoteChannel computeIfAbsent = this.remoteChannelMap.computeIfAbsent(heartbeatSyncerData.getGroup() + "@" + decode.id().asLongText(), str -> {
                        return decode;
                    });
                    computeIfAbsent.setExtendAttribute(decode.getChannelExtendAttribute());
                    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(computeIfAbsent, heartbeatSyncerData.getClientId(), heartbeatSyncerData.getLanguage(), heartbeatSyncerData.getVersion());
                    log.debug("start process remote channel. data:{}, clientChannelInfo:{}", heartbeatSyncerData, clientChannelInfo);
                    if (heartbeatSyncerData.getHeartbeatType().equals(HeartbeatType.REGISTER)) {
                        this.consumerManager.registerConsumer(heartbeatSyncerData.getGroup(), clientChannelInfo, heartbeatSyncerData.getConsumeType(), heartbeatSyncerData.getMessageModel(), heartbeatSyncerData.getConsumeFromWhere(), heartbeatSyncerData.getSubscriptionDataSet(), false);
                    } else {
                        this.consumerManager.unregisterConsumer(heartbeatSyncerData.getGroup(), clientChannelInfo, false);
                    }
                }
            } catch (Throwable th) {
                log.error("heartbeat consume message failed. msg:{}, data:{}", new Object[]{messageExt, new String(messageExt.getBody(), StandardCharsets.UTF_8), th});
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private String buildLocalProxyId() {
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
    }
}
