package com.mnt.sio.core.sout;

import com.mnt.base.util.BaseConfiguration;
import com.mnt.base.util.CommonUtil;
import com.mnt.sio.core.dtd.StreamData;
import com.mnt.sio.core.dtd.StreamDataUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/mnt/sio/core/sout/KafkaOut.class */
public class KafkaOut implements SOut {
    protected KafkaProducer<String, String> producer;
    protected String topic;
    protected String name;
    protected String desc;
    protected final Logger logger = LogManager.getLogger(getClass());
    private ThreadLocal<Future<RecordMetadata>> futTL = new ThreadLocal<>();

    public KafkaOut() {
    }

    public KafkaOut(String str, Map<String, Object> map) {
        init(str, map);
    }

    public void init(String str, Map<String, Object> map) {
        this.topic = BaseConfiguration.getProperty(str, str);
        this.name = CommonUtil.concatWith(":", new Object[]{this.topic, "kafka"});
        this.desc = CommonUtil.concatWith(":", new Object[]{"sink", this.topic, "kafka"});
        HashMap hashMap = new HashMap(map);
        if (!hashMap.containsKey("key.serializer")) {
            hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
        if (!hashMap.containsKey("value.serializer")) {
            hashMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
        this.producer = new KafkaProducer<>(hashMap);
    }

    @Override // com.mnt.sio.core.sout.SOut
    public void append(StreamData streamData) {
        this.futTL.set(this.producer.send(new ProducerRecord(this.topic, streamData.key(), StreamDataUtil.jsonValue(streamData))));
    }

    @Override // com.mnt.sio.core.sout.SOut
    public void flush(boolean z) {
        Future<RecordMetadata> future;
        if (!z || (future = this.futTL.get()) == null) {
            return;
        }
        try {
            future.get();
        } catch (Exception e) {
            this.logger.error("erorr while get response from produce ack, break the pipe.", e);
            throw new RuntimeException("erorr while get response from produce ack, break the pipe.", e);
        }
    }

    @Override // com.mnt.sio.core.sout.SOut
    public String name() {
        return this.name;
    }

    @Override // com.mnt.sio.core.sout.SOut
    public String desc() {
        return this.desc;
    }
}
