package com.xiaomi.mone.log.agent.extension;

import com.google.common.collect.Lists;
import com.xiaomi.hera.tspandata.TSpanData;
import com.xiaomi.mone.log.agent.common.HashUtil;
import com.xiaomi.mone.log.agent.common.trace.TraceUtil;
import com.xiaomi.mone.log.agent.export.MsgExporter;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.common.Constant;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/agent/extension/KafkaExporter.class */
public class KafkaExporter implements MsgExporter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaExporter.class);
    private Producer producer;
    private String topic;
    private String tag;
    private boolean isCommonTag;
    private Integer batchSize;
    private Integer maxPartitionPer = 4;

    public KafkaExporter(Producer producer, String str) {
        this.producer = producer;
        this.tag = str;
    }

    @Override // com.xiaomi.mone.log.agent.channel.Closeable
    public void close() {
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public void export(LineMessage lineMessage) {
        export(Lists.newArrayList(lineMessage));
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public void export(List<LineMessage> list) {
        if (list.isEmpty()) {
            return;
        }
        List<PartitionInfo> partitionsFor = this.producer.partitionsFor(this.topic);
        Iterator<LineMessage> it = list.iterator();
        while (it.hasNext()) {
            ProducerRecord<String, String> buildProducerRecord = buildProducerRecord(it.next(), partitionsFor);
            if (buildProducerRecord != null) {
                this.producer.send(buildProducerRecord, (recordMetadata, exc) -> {
                    if (null != exc) {
                        log.error("send message to kafka error", (Throwable) exc);
                    }
                });
            }
        }
    }

    private ProducerRecord<String, String> buildProducerRecord(LineMessage lineMessage, List<PartitionInfo> list) {
        byte[] bytes;
        String properties = lineMessage.getProperties(LineMessage.KEY_MESSAGE_TYPE);
        ProducerRecord<String, String> producerRecord = null;
        if (String.valueOf(LogTypeEnum.ORIGIN_LOG.getType()).equals(properties)) {
            producerRecord = new ProducerRecord<>(this.topic, this.tag, gson.toJson(lineMessage.getMsgBody()));
        } else if (OPENTELEMETRY_TYPE.equals(properties)) {
            TSpanData tSpanData = TraceUtil.toTSpanData(lineMessage.getMsgBody());
            if (tSpanData != null && (bytes = TraceUtil.toBytes(tSpanData)) != null) {
                String str = new String(bytes, StandardCharsets.ISO_8859_1);
                producerRecord = new ProducerRecord<>(this.topic, this.tag, str);
                String serviceName = tSpanData.getExtra().getServiceName();
                if (serviceName != null) {
                    producerRecord = new ProducerRecord<>(this.topic, Integer.valueOf(list.get(HashUtil.consistentHash(String.format("p%s%s", Integer.valueOf(ThreadLocalRandom.current().nextInt(this.maxPartitionPer.intValue())), serviceName), list.size())).partition()), this.tag, str);
                }
            }
        } else {
            producerRecord = new ProducerRecord<>(this.topic, this.tag, gson.toJson(lineMessage));
        }
        return producerRecord;
    }

    public void setTopic(String str) {
        this.topic = str;
        if (str.startsWith(Constant.COMMON_MQ_PREFIX)) {
            this.isCommonTag = true;
        }
    }

    public Integer getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(Integer num) {
        this.batchSize = num;
    }

    @Override // com.xiaomi.mone.log.agent.export.MsgExporter
    public int batchExportSize() {
        if (null == this.batchSize || this.batchSize.intValue() < 0) {
            return 200;
        }
        return this.batchSize.intValue();
    }
}
