package com.xiaomi.mone.log.agent.extension;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.mone.log.agent.common.HashUtil;
import com.xiaomi.mone.log.agent.common.trace.TraceUtil;
import com.xiaomi.mone.log.agent.export.MsgExporter;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/agent/extension/RmqExporter.class */
public class RmqExporter implements MsgExporter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RmqExporter.class);
    private DefaultMQProducer mqProducer;
    private String rmqTopic;
    private Integer batchSize;
    private Gson gson = new Gson();
    private List<MessageQueue> messageQueueList;

    public RmqExporter(DefaultMQProducer defaultMQProducer) {
        this.mqProducer = defaultMQProducer;
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public void export(LineMessage lineMessage) {
        export(Lists.newArrayList(lineMessage));
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public void export(List<LineMessage> list) {
        if (list.isEmpty()) {
            return;
        }
        this.messageQueueList = fetchMessageQueue(this.rmqTopic);
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (LineMessage lineMessage : list) {
            if (OPENTELEMETRY_TYPE.equals(lineMessage.getProperties(LineMessage.KEY_MESSAGE_TYPE))) {
                processOpenTelemetryMessage(lineMessage, hashMap);
            } else {
                processLogMessage(lineMessage, arrayList);
            }
        }
        sendMessagesToQueues(hashMap);
        sendMessagesToProducer(arrayList);
    }

    private void processOpenTelemetryMessage(LineMessage lineMessage, Map<MessageQueue, List<Message>> map) {
        byte[] bytes = TraceUtil.toBytes(lineMessage.getMsgBody());
        if (bytes != null) {
            String serviceName = TraceUtil.toTSpanData(lineMessage.getMsgBody()).getExtra().getServiceName();
            Message message = new Message();
            message.setBody(bytes);
            message.setTopic(this.rmqTopic);
            MessageQueue calculateMessageQueue = calculateMessageQueue(serviceName);
            map.putIfAbsent(calculateMessageQueue, new ArrayList());
            map.get(calculateMessageQueue).add(message);
        }
    }

    private void processLogMessage(LineMessage lineMessage, List<Message> list) {
        Message message = new Message();
        message.setTags(lineMessage.getProperties(LineMessage.KEY_MQ_TOPIC_TAG));
        message.setBody(this.gson.toJson(lineMessage).getBytes(StandardCharsets.UTF_8));
        message.setTopic(this.rmqTopic);
        list.add(message);
    }

    private MessageQueue calculateMessageQueue(String str) {
        Integer num = 2;
        return this.messageQueueList.get(HashUtil.consistentHash(String.format("p%s%s", Integer.valueOf(ThreadLocalRandom.current().nextInt(num.intValue())), str), this.messageQueueList.size()));
    }

    private void sendMessagesToQueues(Map<MessageQueue, List<Message>> map) {
        try {
            for (Map.Entry<MessageQueue, List<Message>> entry : map.entrySet()) {
                List<Message> value = entry.getValue();
                if (CollectionUtils.isNotEmpty(value)) {
                    this.mqProducer.send(value, entry.getKey());
                }
            }
        } catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
            log.error("OPENTELEMETRY log rocketMQ export error", e);
        }
    }

    private void sendMessagesToProducer(List<Message> list) {
        try {
            if (CollectionUtils.isNotEmpty(list)) {
                this.mqProducer.send(list);
            }
        } catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
            log.error("normal rocketMQ export error", e);
        }
    }

    public String getRmqTopic() {
        return this.rmqTopic;
    }

    public void setRmqTopic(String str) {
        this.messageQueueList = fetchMessageQueue(str);
        this.rmqTopic = str;
    }

    public void setBatchSize(Integer num) {
        this.batchSize = num;
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public int batchExportSize() {
        if (null == this.batchSize || this.batchSize.intValue() < 0) {
            return 200;
        }
        return this.batchSize.intValue();
    }

    @Override // com.xiaomi.mone.log.agent.channel.Closeable
    public void close() {
    }

    public List<MessageQueue> fetchMessageQueue(String str) {
        try {
            return this.mqProducer.fetchPublishMessageQueues(str);
        } catch (MQClientException e) {
            log.error("fetch queue task error : ", (Throwable) e);
            return new ArrayList();
        }
    }
}
