package com.xiaomi.mone.log.agent.extension;

import com.google.common.base.Preconditions;
import com.xiaomi.mone.log.agent.export.MsgExporter;
import com.xiaomi.mone.log.agent.output.Output;
import com.xiaomi.mone.log.agent.service.OutPutService;
import com.xiaomi.mone.log.api.model.meta.LogPattern;
import com.xiaomi.mone.log.api.model.meta.MQConfig;
import com.xiaomi.mone.log.utils.KafkaUtils;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.plugin.config.anno.Value;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service(name = "KafkaService")
/* loaded from: input_file:com/xiaomi/mone/log/agent/extension/KafkaService.class */
public class KafkaService implements OutPutService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaService.class);
    private ConcurrentHashMap<String, Producer> producerMap;

    @Value("$kafka.use.ssl")
    private String kafkaUseSsl;

    @Value("$kafka.sll.location")
    private String kafkaSllLocation;

    public void init() {
        this.producerMap = new ConcurrentHashMap<>(128);
    }

    @Override // com.xiaomi.mone.log.agent.service.OutPutService
    public boolean compare(Output output, Output output2) {
        if (Objects.equals(output.getOutputType(), output2.getOutputType())) {
            return ((output instanceof KafkaOutput) && (output2 instanceof KafkaOutput)) ? ((KafkaOutput) output).equals((KafkaOutput) output2) : Objects.equals(output, output2);
        }
        return false;
    }

    @Override // com.xiaomi.mone.log.agent.service.OutPutService
    public void preCheckOutput(Output output) {
        KafkaOutput kafkaOutput = (KafkaOutput) output;
        Preconditions.checkArgument(null != kafkaOutput.getClusterInfo(), "rmqOutput.getClusterInfo can not be null");
        Preconditions.checkArgument(null != kafkaOutput.getTopic(), "rmqOutput.getTopic can not be null");
    }

    @Override // com.xiaomi.mone.log.agent.service.OutPutService
    public MsgExporter exporterTrans(Output output) throws Exception {
        KafkaOutput kafkaOutput = (KafkaOutput) output;
        String key = getKey(kafkaOutput.getClusterInfo(), kafkaOutput.getTopic(), kafkaOutput.getTag());
        Producer producer = this.producerMap.get(key);
        if (null == producer) {
            producer = initMqProducer(kafkaOutput);
            this.producerMap.put(key, producer);
        }
        KafkaExporter kafkaExporter = new KafkaExporter(producer, output.getTag());
        kafkaExporter.setTopic(kafkaOutput.getTopic());
        kafkaExporter.setBatchSize(kafkaOutput.getBatchExportSize());
        return kafkaExporter;
    }

    private String getKey(String str, String str2, String str3) {
        return String.format("%s-%s", str, str2, str3);
    }

    private Producer initMqProducer(KafkaOutput kafkaOutput) {
        Properties properties = new Properties();
        String clusterInfo = kafkaOutput.getClusterInfo();
        String ak = kafkaOutput.getAk();
        String sk = kafkaOutput.getSk();
        if (StringUtils.isNotEmpty(ak) && StringUtils.isNotEmpty(sk) && Objects.equals("true", this.kafkaUseSsl)) {
            properties.putAll(KafkaUtils.getSslKafkaProperties(clusterInfo, ak, sk, this.kafkaSllLocation));
        } else if (StringUtils.isNotEmpty(ak) && StringUtils.isNotEmpty(sk)) {
            properties.putAll(KafkaUtils.getVpc9094KafkaProperties(clusterInfo, ak, sk));
        } else {
            properties.putAll(KafkaUtils.getDefaultKafkaProperties(clusterInfo));
        }
        properties.put("bootstrap.servers", clusterInfo);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");
        return new KafkaProducer(properties);
    }

    @Override // com.xiaomi.mone.log.agent.service.OutPutService
    public void removeMQ(Output output) {
        KafkaOutput kafkaOutput = (KafkaOutput) output;
        String key = getKey(kafkaOutput.getClusterInfo(), kafkaOutput.getTopic(), kafkaOutput.getTag());
        if (null != this.producerMap.get(key)) {
            this.producerMap.get(key).close();
        }
        this.producerMap.remove(key);
    }

    @Override // com.xiaomi.mone.log.agent.service.OutPutService
    public Output configOutPut(LogPattern logPattern) {
        MQConfig mQConfig = logPattern.getMQConfig();
        KafkaOutput kafkaOutput = new KafkaOutput();
        kafkaOutput.setOutputType(KafkaOutput.OUTPUT_KAFKAMQ);
        kafkaOutput.setClusterInfo(mQConfig.getClusterInfo());
        kafkaOutput.setProducerGroup(mQConfig.getProducerGroup());
        kafkaOutput.setAk(mQConfig.getAk());
        kafkaOutput.setSk(mQConfig.getSk());
        kafkaOutput.setTopic(mQConfig.getTopic());
        kafkaOutput.setPartitionCnt(mQConfig.getPartitionCnt());
        kafkaOutput.setTag(mQConfig.getTag());
        kafkaOutput.setProducerGroup("subGroup_" + (null == logPattern.getPatternCode() ? "" : logPattern.getPatternCode()));
        return kafkaOutput;
    }
}
