package org.apache.inlong.agent.plugin.sinks;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/PulsarSink.class */
public class PulsarSink extends AbstractSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerFactory.class);
    private static final AtomicInteger CLIENT_INDEX = new AtomicInteger(0);
    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("PulsarSink"));
    private PositionManager positionManager;
    private List<MQClusterInfo> mqClusterInfos;
    private String topic;
    private List<PulsarTopicSender> pulsarSenders;
    private int clientIoThreads;
    private int sendQueueSize;
    private Semaphore sendQueueSemaphore;
    private LinkedBlockingQueue<BatchProxyMessage> pulsarSendQueue;
    private int connectionsPreBroker;
    private boolean enableBatch;
    private boolean blockIfQueueFull;
    private int maxPendingMessages;
    private int maxPendingMessagesAcrossPartitions;
    private CompressionType compressionType;
    private int maxBatchingBytes;
    private int maxBatchingMessages;
    private long maxBatchingPublishDelayMillis;
    private int sendTimeoutSecond;
    private int producerNum;
    private boolean asyncSend;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private volatile boolean shutdown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/PulsarSink$PulsarTopicSender.class */
    public class PulsarTopicSender {
        private final AtomicInteger producerIndex = new AtomicInteger(0);
        private final PulsarClient pulsarClient;
        private List<Producer> producers;

        public PulsarTopicSender(PulsarClient pulsarClient, int i) {
            this.pulsarClient = pulsarClient;
            initProducer(i);
        }

        public Producer getProducer() {
            if (CollectionUtils.isEmpty(this.producers)) {
                PulsarSink.LOGGER.error("job[{}] empty producers", PulsarSink.this.jobInstanceId);
                return null;
            }
            return this.producers.get((this.producerIndex.getAndIncrement() & Integer.MAX_VALUE) % this.producers.size());
        }

        public void close() {
            if (CollectionUtils.isEmpty(this.producers)) {
                return;
            }
            Iterator<Producer> it = this.producers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Throwable th) {
                    PulsarSink.LOGGER.error("job[{}] close pulsar producer error", PulsarSink.this.jobInstanceId, th);
                }
            }
            try {
                this.pulsarClient.shutdown();
            } catch (PulsarClientException e) {
                PulsarSink.LOGGER.error("job[{}] close pulsar client error", PulsarSink.this.jobInstanceId, e);
            }
        }

        private void initProducer(int i) {
            this.producers = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                this.producers.add(createProducer());
            }
        }

        private Producer<byte[]> createProducer() {
            try {
                return this.pulsarClient.newProducer().topic(PulsarSink.this.topic).sendTimeout(PulsarSink.this.sendTimeoutSecond, TimeUnit.SECONDS).topic(PulsarSink.this.topic).enableBatching(PulsarSink.this.enableBatch).blockIfQueueFull(PulsarSink.this.blockIfQueueFull).maxPendingMessages(PulsarSink.this.maxPendingMessages).maxPendingMessagesAcrossPartitions(PulsarSink.this.maxPendingMessagesAcrossPartitions).compressionType(PulsarSink.this.compressionType).batchingMaxMessages(PulsarSink.this.maxBatchingMessages).batchingMaxBytes(PulsarSink.this.maxBatchingBytes).batchingMaxPublishDelay(PulsarSink.this.maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS).create();
            } catch (Throwable th) {
                PulsarSink.LOGGER.error("job[{}] create producer[topic:{}] error", new Object[]{PulsarSink.this.jobInstanceId, PulsarSink.this.topic, th});
                return null;
            }
        }
    }

    @Override // org.apache.inlong.agent.plugin.sinks.AbstractSink
    public void init(JobProfile jobProfile) {
        super.init(jobProfile);
        this.positionManager = PositionManager.getInstance();
        this.sendQueueSize = this.agentConf.getInt("agent.sink.pulsar.send.queue.size", 20000);
        this.sendQueueSemaphore = new Semaphore(this.sendQueueSize);
        this.pulsarSendQueue = new LinkedBlockingQueue<>(this.sendQueueSize);
        this.clientIoThreads = this.agentConf.getInt("agent.sink.pulsar.client.io.thread.num", AgentConstants.DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM);
        this.connectionsPreBroker = this.agentConf.getInt("agent.sink.pulsar.connection.pre.broker", 1);
        this.sendTimeoutSecond = this.agentConf.getInt("agent.sink.pulsar.send.timeout.second", 30);
        this.enableBatch = this.agentConf.getBoolean("agent.sink.pullsar.enable.batch", true);
        this.blockIfQueueFull = this.agentConf.getBoolean("agent.sink.pulsar.block.if.queue.full", true);
        this.maxPendingMessages = this.agentConf.getInt("agent.sink.pulsar.max.pending.messages", LogFileCollectTask.CORE_THREAD_PRINT_TIME);
        this.maxBatchingBytes = this.agentConf.getInt("agent.sink.pulsar.max.batch.bytes", 131072);
        this.maxBatchingMessages = this.agentConf.getInt("agent.sink.pulsar.max.batch.messages", LogFileCollectTask.CORE_THREAD_SLEEP_TIME);
        this.maxBatchingPublishDelayMillis = this.agentConf.getInt("agent.sink.pulsar.max.batch.interval.millis", 1);
        this.maxPendingMessagesAcrossPartitions = this.agentConf.getInt("agent.sink.pulsar.max.messages.across.partition", 500000);
        this.producerNum = this.agentConf.getInt("agent.sink.pulsar.producer.num", 3);
        this.asyncSend = this.agentConf.getBoolean("agent.sink.pulsar.enbale.async.send", true);
        String str = this.agentConf.get("agent.sink.pulsar.compression.type", "NONE");
        if (StringUtils.isNotEmpty(str)) {
            this.compressionType = CompressionType.valueOf(str);
        } else {
            this.compressionType = CompressionType.NONE;
        }
        this.mqClusterInfos = jobProfile.getMqClusters();
        Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobProfile.getMqTopic()) && jobProfile.getMqTopic().isValid(), "no valid pulsar topic config");
        this.topic = jobProfile.getMqTopic().getTopic();
        this.pulsarSenders = new ArrayList();
        initPulsarSender();
        EXECUTOR_SERVICE.execute(sendDataThread());
        EXECUTOR_SERVICE.execute(flushCache());
    }

    public void write(Message message) {
        if (message != null) {
            try {
                if (message instanceof EndMessage) {
                    this.sinkMetric.sinkFailCount.incrementAndGet();
                } else {
                    ProxyMessage proxyMessage = new ProxyMessage(message);
                    this.cache.compute(proxyMessage.getBatchKey(), (str, packProxyMessage) -> {
                        if (packProxyMessage == null) {
                            packProxyMessage = new PackProxyMessage(this.jobInstanceId, this.jobConf, this.inlongGroupId, this.inlongStreamId);
                            packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
                            packProxyMessage.addTopicAndDataTime(this.topic, System.currentTimeMillis());
                        }
                        packProxyMessage.addProxyMessage(proxyMessage);
                        return packProxyMessage;
                    });
                    this.sinkMetric.sinkSuccessCount.incrementAndGet();
                }
            } catch (Exception e) {
                LOGGER.error("write message to Proxy sink error", e);
            } catch (Throwable th) {
                ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
            }
        }
    }

    public void destroy() {
        LOGGER.info("destroy pulsar sink, job[{}], source[{}]", this.jobInstanceId, this.sourceName);
        while (!sinkFinish()) {
            LOGGER.info("job {} wait until cache all data to pulsar", this.jobInstanceId);
            AgentUtils.silenceSleepInMs(this.batchFlushInterval);
        }
        this.shutdown = true;
        EXECUTOR_SERVICE.shutdown();
        if (CollectionUtils.isNotEmpty(this.pulsarSenders)) {
            Iterator<PulsarTopicSender> it = this.pulsarSenders.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.pulsarSenders.clear();
        }
    }

    private boolean sinkFinish() {
        return this.cache.values().stream().allMatch((v0) -> {
            return v0.isEmpty();
        }) && this.pulsarSendQueue.isEmpty();
    }

    private Runnable flushCache() {
        return () -> {
            LOGGER.info("start flush cache thread for {} ProxySink", this.inlongGroupId);
            while (!this.shutdown) {
                try {
                    this.cache.forEach((str, packProxyMessage) -> {
                        BatchProxyMessage fetchBatch = packProxyMessage.fetchBatch();
                        if (fetchBatch != null) {
                            try {
                                this.sendQueueSemaphore.acquire();
                                this.pulsarSendQueue.put(fetchBatch);
                                LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, read source is {} sendTime is {}", new Object[]{this.inlongGroupId, str, Integer.valueOf(fetchBatch.getDataList().size()), this.jobInstanceId, this.sourceName, Long.valueOf(fetchBatch.getDataTime())});
                            } catch (Exception e) {
                                this.sendQueueSemaphore.release();
                                LOGGER.error("flush data to send queue", e);
                            }
                        }
                    });
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                } catch (Throwable th) {
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                } finally {
                    AgentUtils.silenceSleepInMs(this.batchFlushInterval);
                }
            }
        };
    }

    private Runnable sendDataThread() {
        return () -> {
            LOGGER.info("start pulsar sink send data thread, job[{}], groupId[{}]", this.jobInstanceId, this.inlongGroupId);
            while (!this.shutdown) {
                try {
                    BatchProxyMessage poll = this.pulsarSendQueue.poll(1L, TimeUnit.MILLISECONDS);
                    if (!ObjectUtils.isEmpty(poll)) {
                        sendData(poll);
                    }
                } catch (Throwable th) {
                    LOGGER.error("Send data error", th);
                }
            }
        };
    }

    private void sendData(BatchProxyMessage batchProxyMessage) throws InterruptedException {
        if (ObjectUtils.isEmpty(batchProxyMessage)) {
            return;
        }
        Producer selectProducer = selectProducer();
        if (ObjectUtils.isEmpty(selectProducer)) {
            this.pulsarSendQueue.put(batchProxyMessage);
            LOGGER.error("send job[{}] data err, empty pulsar producer", this.jobInstanceId);
            return;
        }
        InLongMsg inLongMsg = batchProxyMessage.getInLongMsg();
        this.sinkMetric.pluginSendCount.addAndGet(batchProxyMessage.getMsgCnt());
        if (this.asyncSend) {
            selectProducer.newMessage().eventTime(batchProxyMessage.getDataTime()).value(inLongMsg.buildArray()).sendAsync().whenCompleteAsync((messageId, th) -> {
                if (th == null) {
                    this.sendQueueSemaphore.release();
                    updateSuccessSendMetrics(batchProxyMessage);
                    return;
                }
                this.sinkMetric.pluginSendFailCount.addAndGet(batchProxyMessage.getMsgCnt());
                LOGGER.error("send data fail to pulsar, add back to sendqueue, current queue size {}", Integer.valueOf(this.pulsarSendQueue.size()), th);
                try {
                    this.pulsarSendQueue.put(batchProxyMessage);
                } catch (InterruptedException e) {
                    LOGGER.error("put back to queue fail send queue size {}", Integer.valueOf(this.pulsarSendQueue.size()), th);
                }
            });
            return;
        }
        try {
            selectProducer.newMessage().eventTime(batchProxyMessage.getDataTime()).value(inLongMsg.buildArray()).send();
            this.sendQueueSemaphore.release();
            updateSuccessSendMetrics(batchProxyMessage);
        } catch (PulsarClientException e) {
            this.sinkMetric.pluginSendFailCount.addAndGet(batchProxyMessage.getMsgCnt());
            LOGGER.error("send data fail to pulsar, add back to send queue, send queue size {}", Integer.valueOf(this.pulsarSendQueue.size()), e);
            this.pulsarSendQueue.put(batchProxyMessage);
        }
    }

    private void updateSuccessSendMetrics(BatchProxyMessage batchProxyMessage) {
        AuditUtils.add(4, batchProxyMessage.getGroupId(), batchProxyMessage.getStreamId(), batchProxyMessage.getDataTime(), batchProxyMessage.getMsgCnt(), batchProxyMessage.getTotalSize());
        this.sinkMetric.pluginSendSuccessCount.addAndGet(batchProxyMessage.getMsgCnt());
    }

    private Producer selectProducer() {
        if (!CollectionUtils.isEmpty(this.pulsarSenders)) {
            return this.pulsarSenders.get((CLIENT_INDEX.getAndIncrement() & Integer.MAX_VALUE) % this.pulsarSenders.size()).getProducer();
        }
        LOGGER.error("send job[{}] data err, empty pulsar sender", this.jobInstanceId);
        return null;
    }

    private void initPulsarSender() {
        if (CollectionUtils.isEmpty(this.mqClusterInfos)) {
            LOGGER.error("init job[{}] pulsar client fail, empty mqCluster info", this.jobInstanceId);
            return;
        }
        for (MQClusterInfo mQClusterInfo : this.mqClusterInfos) {
            if (mQClusterInfo.isValid()) {
                try {
                    this.pulsarSenders.add(new PulsarTopicSender(PulsarClient.builder().serviceUrl(mQClusterInfo.getUrl()).ioThreads(this.clientIoThreads).connectionsPerBroker(this.connectionsPreBroker).build(), this.producerNum));
                    LOGGER.info("job[{}] init pulsar client url={}", this.jobInstanceId, mQClusterInfo.getUrl());
                } catch (PulsarClientException e) {
                    LOGGER.error("init job[{}] pulsar client fail", this.jobInstanceId, e);
                }
            }
        }
    }
}
