package org.apache.rocketmq.proxy.service.sysmessage;

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

/* loaded from: input_file:org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.class */
public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently {
    protected static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    protected final TopicRouteService topicRouteService;
    protected final AdminService adminService;
    protected final MQClientAPIFactory mqClientAPIFactory;
    protected DefaultMQPushConsumer defaultMQPushConsumer;

    public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mQClientAPIFactory) {
        this.topicRouteService = topicRouteService;
        this.adminService = adminService;
        this.mqClientAPIFactory = mQClientAPIFactory;
    }

    protected String getSystemMessageProducerId() {
        return "PID_" + getBroadcastTopicName();
    }

    protected String getSystemMessageConsumerId() {
        return "CID_" + getBroadcastTopicName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getBroadcastTopicName() {
        return ConfigurationManager.getProxyConfig().getHeartbeatSyncerTopicName();
    }

    protected String getSubTag() {
        return "*";
    }

    protected String getBroadcastTopicClusterName() {
        return ConfigurationManager.getProxyConfig().getHeartbeatSyncerTopicClusterName();
    }

    protected int getBroadcastTopicQueueNum() {
        return 1;
    }

    protected RPCHook getRpcHook() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendSystemMessage(Object obj) {
        String broadcastTopicName = getBroadcastTopicName();
        try {
            Message message = new Message(broadcastTopicName, JSON.toJSONString(obj).getBytes(StandardCharsets.UTF_8));
            AddressableMessageQueue selectOne = this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(getClass()), broadcastTopicName).getWriteSelector().selectOne(true);
            this.mqClientAPIFactory.getClient().sendMessageAsync(selectOne.getBrokerAddr(), selectOne.getBrokerName(), message, buildSendMessageRequestHeader(message, getSystemMessageProducerId(), selectOne.getQueueId()), Duration.ofSeconds(3L).toMillis()).whenCompleteAsync((sendResult, th) -> {
                if (th != null) {
                    log.error("send system message failed. data: {}, topic: {}", new Object[]{obj, getBroadcastTopicName(), th});
                } else if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
                    log.error("send system message failed. data: {}, topic: {}, sendResult:{}", new Object[]{obj, getBroadcastTopicName(), sendResult});
                }
            });
        } catch (Throwable th2) {
            log.error("send system message failed. data: {}, topic: {}", new Object[]{obj, broadcastTopicName, th2});
        }
    }

    protected SendMessageRequestHeader buildSendMessageRequestHeader(Message message, String str, int i) {
        SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader();
        sendMessageRequestHeader.setProducerGroup(str);
        sendMessageRequestHeader.setTopic(message.getTopic());
        sendMessageRequestHeader.setDefaultTopic("TBW102");
        sendMessageRequestHeader.setDefaultTopicQueueNums(0);
        sendMessageRequestHeader.setQueueId(Integer.valueOf(i));
        sendMessageRequestHeader.setSysFlag(0);
        sendMessageRequestHeader.setBornTimestamp(Long.valueOf(System.currentTimeMillis()));
        sendMessageRequestHeader.setFlag(Integer.valueOf(message.getFlag()));
        sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
        sendMessageRequestHeader.setReconsumeTimes(0);
        sendMessageRequestHeader.setBatch(false);
        return sendMessageRequestHeader;
    }

    public void start() throws Exception {
        createSysTopic();
        this.defaultMQPushConsumer = new DefaultMQPushConsumer((String) null, getSystemMessageConsumerId(), getRpcHook());
        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        try {
            this.defaultMQPushConsumer.subscribe(getBroadcastTopicName(), getSubTag());
            this.defaultMQPushConsumer.registerMessageListener(this);
            this.defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "subscribe to broadcast topic " + getBroadcastTopicName() + " failed. " + e.getMessage());
        }
    }

    protected void createSysTopic() {
        if (this.adminService.topicExist(getBroadcastTopicName())) {
            return;
        }
        String broadcastTopicClusterName = getBroadcastTopicClusterName();
        if (StringUtils.isEmpty(broadcastTopicClusterName)) {
            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "system topic cluster cannot be empty");
        }
        if (!this.adminService.createTopicOnTopicBrokerIfNotExist(getBroadcastTopicName(), broadcastTopicClusterName, getBroadcastTopicQueueNum(), getBroadcastTopicQueueNum(), true, 3)) {
            throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "create system broadcast topic " + getBroadcastTopicName() + " failed on cluster " + broadcastTopicClusterName);
        }
    }

    public void shutdown() throws Exception {
        this.defaultMQPushConsumer.shutdown();
    }
}
