package run.mone.trace.etl.extension.kafka;

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.xiaomi.hera.trace.etl.api.service.MQExtension;
import com.xiaomi.hera.trace.etl.bo.MqConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(name = {"mq.type"}, havingValue = "kafka")
@Service
/* loaded from: input_file:run/mone/trace/etl/extension/kafka/KafkaExtension.class */
public class KafkaExtension implements MQExtension<ProducerRecord<String, String>, ConsumerRecords<String, String>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaExtension.class);

    @NacosValue("${kafka.vpc.type}")
    private String vpcType;

    @Autowired
    private KafkaConfigure kafkaConfigure;
    private KafkaProducer<String, String> producer;
    private String topic;
    private KafkaConsumer<String, String> consumer;

    public void initMq(MqConfig<ConsumerRecords<String, String>> mqConfig) {
        log.info("init rocketmq");
        if (StringUtils.isNotEmpty(mqConfig.getProducerTopicName())) {
            initProducer(mqConfig);
        }
        initConsumer(mqConfig);
    }

    private void initProducer(MqConfig<ConsumerRecords<String, String>> mqConfig) {
        try {
            log.info("init producer start ...");
            Properties createProducerProperties = this.kafkaConfigure.createProducerProperties(mqConfig);
            createProducerProperties.put("acks", "0");
            createProducerProperties.put("batch.size", 563840);
            createProducerProperties.put("linger.ms", 1000);
            createProducerProperties.put("buffer.memory", 33554432);
            createProducerProperties.put("compression.type", "lz4");
            this.producer = new KafkaProducer<>(createProducerProperties);
            this.topic = mqConfig.getProducerTopicName();
            log.info("init producer end ...");
        } catch (Throwable th) {
            log.error("init producer error", th);
            throw new RuntimeException(th);
        }
    }

    private void initConsumer(MqConfig<ConsumerRecords<String, String>> mqConfig) {
        try {
            log.info("init consumer start ...");
            Properties createConsumerProperties = this.kafkaConfigure.createConsumerProperties(mqConfig);
            createConsumerProperties.put("compression.type", "lz4");
            this.consumer = new KafkaConsumer<>(createConsumerProperties);
            ArrayList arrayList = new ArrayList();
            arrayList.add(mqConfig.getConsumerTopicName());
            this.consumer.subscribe(arrayList);
            Executors.newSingleThreadExecutor().submit(() -> {
                while (true) {
                    consumer(mqConfig);
                }
            });
            log.info("init consumer end ...");
        } catch (Throwable th) {
            log.error("init error", th);
            throw new RuntimeException(th);
        }
    }

    private void consumer(MqConfig<ConsumerRecords<String, String>> mqConfig) {
        try {
            mqConfig.getConsumerMethod().apply(this.consumer.poll(1000L));
        } catch (Throwable th) {
            log.error("consumer message error , ", th);
        }
    }

    public void send(ProducerRecord<String, String> producerRecord) {
        send(Collections.singletonList(producerRecord));
    }

    public void send(List<ProducerRecord<String, String>> list) {
        try {
            Iterator<ProducerRecord<String, String>> it = list.iterator();
            while (it.hasNext()) {
                this.producer.send(it.next());
            }
        } catch (Throwable th) {
            log.error("send message error, ", th);
        }
    }

    public void sendByTraceId(String str, ProducerRecord<String, String> producerRecord) {
    }
}
