package run.mone.trace.etl.extension.rocketmq;

import com.xiaomi.hera.trace.etl.api.service.MQExtension;
import com.xiaomi.hera.trace.etl.bo.MqConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(name = {"mq"}, havingValue = "rocketMQ")
@Service
/* loaded from: input_file:BOOT-INF/lib/trace-etl-rocketmq-extension-1.0.0-jdk21.jar:run/mone/trace/etl/extension/rocketmq/RocketMQExtension.class */
public class RocketMQExtension implements MQExtension<MessageExt, MessageExt> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQExtension.class);
    private Function<List<MessageExt>, Boolean> batchConsumerMethod;
    private DefaultMQProducer producer;
    private String topic;
    private ClientMessageQueue clientMessageQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/trace-etl-rocketmq-extension-1.0.0-jdk21.jar:run/mone/trace/etl/extension/rocketmq/RocketMQExtension$TraceEtlMessageListener.class */
    public class TraceEtlMessageListener implements MessageListenerConcurrently {
        private TraceEtlMessageListener() {
        }

        @Override // org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (list == null || list.isEmpty()) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            RocketMQExtension.this.batchConsumerMethod.apply(list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    @Override // com.xiaomi.hera.trace.etl.api.service.MQExtension
    public void initMq(MqConfig<MessageExt> mqConfig) {
        log.info("init rocketmq");
        if (StringUtils.isNotEmpty(mqConfig.getProducerTopicName())) {
            initProducer(mqConfig);
        }
        initConsumer(mqConfig);
    }

    private void initProducer(MqConfig<MessageExt> mqConfig) {
        try {
            log.info("init producer start ...");
            this.topic = mqConfig.getProducerTopicName();
            this.producer = new DefaultMQProducer(mqConfig.getProducerGroup());
            this.producer.setNamesrvAddr(mqConfig.getNameSerAddr());
            this.producer.start();
            this.clientMessageQueue = new ClientMessageQueue(this);
            this.clientMessageQueue.initFetchQueueTask();
            log.info("init producer end ...");
        } catch (Throwable th) {
            log.error("init producer error", th);
            throw new RuntimeException(th);
        }
    }

    private void initConsumer(MqConfig<MessageExt> mqConfig) {
        try {
            log.info("init consumer start ...");
            this.batchConsumerMethod = mqConfig.getBatchConsumerMethod();
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(mqConfig.getConsumerGroup());
            defaultMQPushConsumer.setNamesrvAddr(mqConfig.getNameSerAddr());
            defaultMQPushConsumer.subscribe(mqConfig.getConsumerTopicName(), "*");
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) new TraceEtlMessageListener());
            defaultMQPushConsumer.start();
            log.info("init consumer end ...");
        } catch (Throwable th) {
            log.error("init error", th);
            throw new RuntimeException(th);
        }
    }

    @Override // com.xiaomi.hera.trace.etl.api.service.MQExtension
    public void send(MessageExt messageExt) {
        send(Collections.singletonList(messageExt));
    }

    @Override // com.xiaomi.hera.trace.etl.api.service.MQExtension
    public void send(List<MessageExt> list) {
        ArrayList arrayList = new ArrayList();
        for (MessageExt messageExt : list) {
            Message message = new Message();
            message.setBody(messageExt.getBody());
            message.setTopic(this.topic);
            arrayList.add(message);
        }
        try {
            this.producer.send(arrayList);
        } catch (Throwable th) {
            log.error("rocketmq producer send error", th);
        }
    }

    @Override // com.xiaomi.hera.trace.etl.api.service.MQExtension
    public void sendByTraceId(String str, MessageExt messageExt) {
        this.clientMessageQueue.enqueue(str, messageExt);
    }

    public List<MessageQueue> fetchMessageQueue() {
        try {
            return this.producer.fetchPublishMessageQueues(this.topic);
        } catch (MQClientException e) {
            log.error("fetch queue task error : ", (Throwable) e);
            return new ArrayList();
        }
    }

    public void send(List<MessageExt> list, MessageQueue messageQueue) {
        ArrayList arrayList = new ArrayList();
        for (MessageExt messageExt : list) {
            Message message = new Message();
            message.setBody(messageExt.getBody());
            message.setTopic(this.topic);
            arrayList.add(message);
        }
        try {
            this.producer.send(arrayList, messageQueue);
        } catch (Throwable th) {
            log.error("rocketmq producer send error", th);
        }
    }
}
