package com.xiaomi.hera.trace.etl.es.consumer;

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.xiaomi.hera.trace.etl.api.service.MQExtension;
import com.xiaomi.hera.trace.etl.bo.MqConfig;
import com.xiaomi.hera.trace.etl.es.util.pool.ConsumerPool;
import com.xiaomi.hera.trace.etl.util.ThriftUtil;
import com.xiaomi.hera.tspandata.TSpanData;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.thrift.TDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(name = {"mq"}, havingValue = "rocketMQ")
@Service
/* loaded from: input_file:BOOT-INF/classes/com/xiaomi/hera/trace/etl/es/consumer/RocketMQTraceSpanConsumer.class */
public class RocketMQTraceSpanConsumer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQTraceSpanConsumer.class);

    @Value("${mq.consumer.group}")
    private String group;

    @NacosValue("${mq.nameseraddr}")
    private String nameSerAddr;

    @Value("${mq.es.topic}")
    private String topicName;

    @Autowired
    private ConsumerService consumerService;

    @Autowired
    private MQExtension mq;

    /* loaded from: input_file:BOOT-INF/classes/com/xiaomi/hera/trace/etl/es/consumer/RocketMQTraceSpanConsumer$ConsumerRunner.class */
    private class ConsumerRunner implements Runnable {
        private byte[] message;

        public ConsumerRunner(byte[] bArr) {
            this.message = bArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                TSpanData tSpanData = new TSpanData();
                new TDeserializer(ThriftUtil.PROTOCOL_FACTORY).deserialize(tSpanData, this.message);
                RocketMQTraceSpanConsumer.this.consumerService.consumer(tSpanData);
            } catch (Throwable th) {
                RocketMQTraceSpanConsumer.log.error("consumer error : ", th);
            }
        }
    }

    @PostConstruct
    public void takeMessage() throws MQClientException {
        MqConfig mqConfig = new MqConfig();
        mqConfig.setNameSerAddr(this.nameSerAddr);
        mqConfig.setConsumerGroup(this.group);
        mqConfig.setConsumerTopicName(this.topicName);
        mqConfig.setBatchConsumerMethod(list -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ConsumerPool.CONSUMER_POOL.submit(new ConsumerRunner(((MessageExt) it.next()).getBody()));
                    await();
                }
            } catch (Throwable th) {
                log.error("consumer message error", th);
            }
            return true;
        });
        this.mq.initMq(mqConfig);
    }

    private void await() {
        while (ConsumerPool.CONSUMER_QUEUE.remainingCapacity() <= 3000) {
            try {
                TimeUnit.MILLISECONDS.sleep(1L);
            } catch (Throwable th) {
                log.error("await error : ", th);
            }
        }
    }
}
