package org.apache.inlong.agent.plugin.sinks;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/KafkaSink.class */
public class KafkaSink extends AbstractSink {
    private PositionManager taskPositionManager;
    private List<MQClusterInfo> mqClusterInfos;
    private String topic;
    private List<KafkaSender> kafkaSenders;
    private LinkedBlockingQueue<BatchProxyMessage> kafkaSendQueue;
    private int producerNum;
    private boolean asyncSend;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("KafkaSink"));
    private static final AtomicInteger KAFKA_SENDER_INDEX = new AtomicInteger(0);
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private volatile boolean shutdown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/KafkaSink$AsyncSinkCallback.class */
    public class AsyncSinkCallback implements Callback {
        private long startTime;
        private BatchProxyMessage batchMsg;

        public AsyncSinkCallback(long j, BatchProxyMessage batchProxyMessage) {
            this.startTime = j;
            this.batchMsg = batchProxyMessage;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                KafkaSink.this.sinkMetric.pluginSendFailCount.addAndGet(this.batchMsg.getMsgCnt());
                KafkaSink.LOGGER.error("send job[{}] data fail to kafka, will add back to sendqueue, current sendqueue size {}", new Object[]{KafkaSink.this.jobInstanceId, Integer.valueOf(KafkaSink.this.kafkaSendQueue.size()), exc});
                try {
                    KafkaSink.this.kafkaSendQueue.put(this.batchMsg);
                } catch (InterruptedException e) {
                    KafkaSink.LOGGER.error("put job[{}] data back to queue fail, send queue size {}", new Object[]{KafkaSink.this.jobInstanceId, Integer.valueOf(KafkaSink.this.kafkaSendQueue.size()), e});
                }
            } else {
                KafkaSink.this.updateSuccessSendMetrics(this.batchMsg);
            }
            if (KafkaSink.LOGGER.isDebugEnabled()) {
                long currentTimeMillis = System.currentTimeMillis() - this.startTime;
                if (recordMetadata != null) {
                    KafkaSink.LOGGER.debug("acked job[{}] message partition:{} ofset:{}", new Object[]{KafkaSink.this.jobInstanceId, Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                }
                KafkaSink.LOGGER.debug("job[{}] send data to kafka elapsed time: {}", KafkaSink.this.jobInstanceId, Long.valueOf(currentTimeMillis));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/KafkaSink$KafkaSender.class */
    public class KafkaSender {
        private List<KafkaProducer<String, byte[]>> producers;
        private final Properties kafkaProps = new Properties();
        private final AtomicInteger producerIndex = new AtomicInteger(0);

        public KafkaSender(MQClusterInfo mQClusterInfo, int i) {
            setKafkaProps(mQClusterInfo);
            initKafkaProducer(this.kafkaProps, i);
        }

        private void setKafkaProps(MQClusterInfo mQClusterInfo) {
            this.kafkaProps.clear();
            Map params = mQClusterInfo.getParams();
            String str = (String) params.get("bootstrap.servers");
            if (str == null) {
                throw new IllegalArgumentException("kafka param bootstrap.servers is null");
            }
            this.kafkaProps.put("bootstrap.servers", str);
            String str2 = (String) params.get("acks");
            if (StringUtils.isNotEmpty(str2)) {
                this.kafkaProps.put("acks", str2);
            } else {
                this.kafkaProps.put("acks", "1");
            }
            String str3 = (String) params.get("compression.type");
            if (StringUtils.isNotEmpty(str3)) {
                this.kafkaProps.put("compression.type", str3);
            } else {
                this.kafkaProps.put("compression.type", "none");
            }
            String str4 = (String) params.get("key.serializer");
            if (StringUtils.isNotEmpty(str4)) {
                this.kafkaProps.put("key.serializer", str4);
            } else {
                this.kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            }
            String str5 = (String) params.get("value.serializer");
            if (StringUtils.isNotEmpty(str4)) {
                this.kafkaProps.put("value.serializer", str5);
            } else {
                this.kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            }
            String str6 = (String) params.get("linger.ms");
            if (StringUtils.isNotEmpty(str6)) {
                this.kafkaProps.put("linger.ms", Integer.valueOf(Integer.parseInt(str6)));
            }
            String str7 = (String) params.get("batch.size");
            if (StringUtils.isNotEmpty(str7)) {
                this.kafkaProps.put("batch.size", Integer.valueOf(Integer.parseInt(str7)));
            }
            String str8 = (String) params.get("buffer.memory");
            if (StringUtils.isNotEmpty(str7)) {
                this.kafkaProps.put("buffer.memory", Integer.valueOf(Integer.parseInt(str8)));
            }
            String str9 = (String) params.get("security.protocol");
            if (StringUtils.isNotEmpty(str9)) {
                this.kafkaProps.put("security.protocol", str9);
            }
            String str10 = (String) params.get("sasl.mechanism");
            if (StringUtils.isNotEmpty(str10)) {
                this.kafkaProps.put("sasl.mechanism", str10);
            }
            String str11 = (String) params.get("sasl.jaas.config");
            if (StringUtils.isNotEmpty(str11)) {
                this.kafkaProps.put("sasl.jaas.config", str11);
            }
        }

        private void initKafkaProducer(Properties properties, int i) {
            this.producers = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                this.producers.add(new KafkaProducer<>(properties));
            }
        }

        public KafkaProducer<String, byte[]> getProducer() {
            if (CollectionUtils.isEmpty(this.producers)) {
                KafkaSink.LOGGER.error("job[{}] empty producers", KafkaSink.this.jobInstanceId);
                return null;
            }
            return this.producers.get((this.producerIndex.getAndIncrement() & Integer.MAX_VALUE) % this.producers.size());
        }

        public void close() {
            if (CollectionUtils.isEmpty(this.producers)) {
                return;
            }
            Iterator<KafkaProducer<String, byte[]>> it = this.producers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    @Override // org.apache.inlong.agent.plugin.sinks.AbstractSink
    public void init(JobProfile jobProfile) {
        super.init(jobProfile);
        this.taskPositionManager = PositionManager.getInstance();
        this.kafkaSendQueue = new LinkedBlockingQueue<>(this.agentConf.getInt("agent.sink.kafka.send.queue.size", 20000));
        this.producerNum = this.agentConf.getInt("agent.sink.kafka.producer.num", 3);
        this.asyncSend = this.agentConf.getBoolean("agent.sink.kafka.enbale.async.send", true);
        this.mqClusterInfos = jobProfile.getMqClusters();
        Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobProfile.getMqTopic()) && jobProfile.getMqTopic().isValid(), "no valid kafka topic config");
        this.topic = jobProfile.getMqTopic().getTopic();
        this.kafkaSenders = new ArrayList();
        initKafkaSender();
        EXECUTOR_SERVICE.execute(sendDataThread());
        EXECUTOR_SERVICE.execute(flushCache());
    }

    public void write(Message message) {
        if (message == null || (message instanceof EndMessage)) {
            return;
        }
        try {
            ProxyMessage proxyMessage = new ProxyMessage(message);
            this.cache.compute(proxyMessage.getBatchKey(), (str, packProxyMessage) -> {
                if (packProxyMessage == null) {
                    packProxyMessage = new PackProxyMessage(this.jobInstanceId, this.jobConf, this.inlongGroupId, this.inlongStreamId);
                    packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
                    packProxyMessage.addTopicAndDataTime(this.topic, System.currentTimeMillis());
                }
                packProxyMessage.addProxyMessage(proxyMessage);
                return packProxyMessage;
            });
            this.sinkMetric.sinkSuccessCount.incrementAndGet();
        } catch (Exception e) {
            this.sinkMetric.sinkFailCount.incrementAndGet();
            LOGGER.error("write job[{}] data to cache error", this.jobInstanceId, e);
        } catch (Throwable th) {
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
        }
    }

    public void destroy() {
        LOGGER.info("destroy job[{}] kafka sink", this.jobInstanceId);
        while (!sinkFinish()) {
            LOGGER.info("job[{}] wait until cache all data to kafka", this.jobInstanceId);
            AgentUtils.silenceSleepInMs(this.batchFlushInterval);
        }
        this.shutdown = true;
        if (CollectionUtils.isNotEmpty(this.kafkaSenders)) {
            Iterator<KafkaSender> it = this.kafkaSenders.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.kafkaSenders.clear();
        }
    }

    private boolean sinkFinish() {
        return this.cache.values().stream().allMatch((v0) -> {
            return v0.isEmpty();
        }) && this.kafkaSendQueue.isEmpty();
    }

    private Runnable flushCache() {
        return () -> {
            LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", this.jobInstanceId, this.inlongGroupId);
            while (!this.shutdown) {
                try {
                    this.cache.forEach((str, packProxyMessage) -> {
                        BatchProxyMessage fetchBatch = packProxyMessage.fetchBatch();
                        if (fetchBatch == null) {
                            return;
                        }
                        try {
                            this.kafkaSendQueue.put(fetchBatch);
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("send group id {}, message key {},with message size {}, the job id is {}, read source is {} sendTime is {}", new Object[]{this.inlongGroupId, str, Integer.valueOf(fetchBatch.getDataList().size()), this.jobInstanceId, this.sourceName, Long.valueOf(fetchBatch.getDataTime())});
                            }
                        } catch (Exception e) {
                            LOGGER.error("flush job[{}] data to send queue exception", this.jobInstanceId, e);
                        }
                    });
                    AgentUtils.silenceSleepInMs(this.batchFlushInterval);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                } catch (Throwable th) {
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    private Runnable sendDataThread() {
        return () -> {
            LOGGER.info("start kafka sink send data thread, job[{}], groupId[{}]", this.jobInstanceId, this.inlongGroupId);
            while (!this.shutdown) {
                try {
                    BatchProxyMessage poll = this.kafkaSendQueue.poll(1L, TimeUnit.MILLISECONDS);
                    if (!ObjectUtils.isEmpty(poll)) {
                        sendData(poll);
                    }
                } catch (Throwable th) {
                    LOGGER.error("send job[{}] data to kafka error", this.jobInstanceId, th);
                }
            }
        };
    }

    private void sendData(BatchProxyMessage batchProxyMessage) throws InterruptedException {
        if (ObjectUtils.isEmpty(batchProxyMessage)) {
            return;
        }
        KafkaProducer<String, byte[]> selectProducer = selectProducer();
        if (ObjectUtils.isEmpty(selectProducer)) {
            this.kafkaSendQueue.put(batchProxyMessage);
            LOGGER.error("send job[{}] data err, empty kafka producer", this.jobInstanceId);
            return;
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topic, batchProxyMessage.getInLongMsg().buildArray());
        this.sinkMetric.pluginSendCount.addAndGet(batchProxyMessage.getMsgCnt());
        if (this.asyncSend) {
            selectProducer.send(producerRecord, new AsyncSinkCallback(System.currentTimeMillis(), batchProxyMessage));
            return;
        }
        try {
            selectProducer.send(producerRecord).get(3000L, TimeUnit.MILLISECONDS);
            updateSuccessSendMetrics(batchProxyMessage);
        } catch (Exception e) {
            this.sinkMetric.pluginSendFailCount.addAndGet(batchProxyMessage.getMsgCnt());
            LOGGER.error("send job[{}] data fail to kafka, add back to send queue, send queue size {}", new Object[]{this.jobInstanceId, Integer.valueOf(this.kafkaSendQueue.size()), e});
            this.kafkaSendQueue.put(batchProxyMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateSuccessSendMetrics(BatchProxyMessage batchProxyMessage) {
        AuditUtils.add(4, batchProxyMessage.getGroupId(), batchProxyMessage.getStreamId(), batchProxyMessage.getDataTime(), batchProxyMessage.getMsgCnt(), batchProxyMessage.getTotalSize());
        this.sinkMetric.pluginSendSuccessCount.addAndGet(batchProxyMessage.getMsgCnt());
    }

    private KafkaProducer<String, byte[]> selectProducer() {
        if (!CollectionUtils.isEmpty(this.kafkaSenders)) {
            return this.kafkaSenders.get((KAFKA_SENDER_INDEX.getAndIncrement() & Integer.MAX_VALUE) % this.kafkaSenders.size()).getProducer();
        }
        LOGGER.error("send job[{}] data err, empty kafka sender", this.jobInstanceId);
        return null;
    }

    private void initKafkaSender() {
        if (CollectionUtils.isEmpty(this.mqClusterInfos)) {
            LOGGER.error("init job[{}] kafka producer fail, empty mqCluster info", this.jobInstanceId);
            return;
        }
        for (MQClusterInfo mQClusterInfo : this.mqClusterInfos) {
            if (mQClusterInfo.isValid()) {
                this.kafkaSenders.add(new KafkaSender(mQClusterInfo, this.producerNum));
            }
        }
    }
}
