package org.apache.inlong.agent.plugin.sinks;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
import org.apache.inlong.agent.plugin.metrics.PluginMetric;
import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/PulsarSink.class */
public class PulsarSink extends AbstractDaemon implements Sink {
    private static final String PULSAR_SINK_TAG_NAME = "AgentPulsarMetric";
    private String inlongGroupId;
    private String inlongStreamId;
    private boolean async;
    private long pollTimeout;
    private int threadNum;
    private JobProfile profile;
    private LinkedBlockingQueue<byte[]> cache;
    private final PluginMetric pluginMetricNew;
    private final SinkMetrics sinkMetrics;
    private PulsarClient client;
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSink.class);
    private static AtomicLong metricsIndex = new AtomicLong(0);
    private volatile boolean running = true;
    private volatile boolean writing = true;
    private final List<Producer<byte[]>> producerList = new ArrayList();

    public PulsarSink() {
        if (ConfigUtil.isPrometheusEnabled()) {
            this.pluginMetricNew = new PluginPrometheusMetric(AgentUtils.getUniqId(PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
            this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
        } else {
            this.pluginMetricNew = new PluginJmxMetric(AgentUtils.getUniqId(PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
            this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
        }
    }

    public void write(Message message) {
        if (message == null || !this.writing) {
            return;
        }
        try {
            AuditUtils.add(4, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis());
            this.pluginMetricNew.incSendNum();
            this.cache.put(message.getBody());
            this.sinkMetrics.incSinkSuccessCount();
        } catch (Exception e) {
            this.sinkMetrics.incSinkFailCount();
        }
    }

    public void setSourceName(String str) {
    }

    public MessageFilter initMessageFilter(JobProfile jobProfile) {
        return null;
    }

    public void init(JobProfile jobProfile) {
        this.inlongGroupId = jobProfile.get("proxy.inlongGroupId");
        this.inlongStreamId = jobProfile.get("proxy.inlongStreamId", "");
        this.threadNum = jobProfile.getInt("pulsar.producer.thread.num", 1);
        this.async = jobProfile.getBoolean("pulsar.producer.async", true);
        this.pollTimeout = jobProfile.getLong("pulsar.sink.poll.timeout", 1000L);
        int i = jobProfile.getInt("pulsar.sink.cache.capacity", 100000);
        this.profile = jobProfile;
        this.cache = new LinkedBlockingQueue<>(i);
        start();
    }

    public void destroy() {
        try {
            stop();
            LOGGER.info("send success num is {}, failed num is {}", Long.valueOf(this.pluginMetricNew.getSendSuccessNum()), Long.valueOf(this.pluginMetricNew.getSendFailedNum()));
        } catch (Exception e) {
            LOGGER.error("exception caught", e);
        }
    }

    private void sendingData(byte[] bArr, Producer<byte[]> producer) throws PulsarClientException {
        if (this.async) {
            producer.sendAsync(bArr).whenCompleteAsync((messageId, th) -> {
                if (th == null) {
                    this.pluginMetricNew.incSendSuccessNum();
                    return;
                }
                this.pluginMetricNew.incSendFailedNum();
                if (this.cache.offer(bArr)) {
                    return;
                }
                LOGGER.warn("message {} not add back to retry", messageId);
            });
        } else {
            producer.send(bArr);
        }
    }

    private Runnable sendThread(Producer<byte[]> producer) {
        return () -> {
            while (this.running) {
                try {
                    byte[] poll = this.cache.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        sendingData(poll, producer);
                    }
                } catch (Exception e) {
                    LOGGER.warn("exception caught while sending data", e);
                }
            }
        };
    }

    private Producer<byte[]> constructProducer() {
        if (this.profile == null) {
            return null;
        }
        try {
            String str = this.profile.get("pulsar.servers");
            String str2 = this.profile.get("pulsar.topic");
            int i = this.profile.getInt("pulsar.producer.maxPending.count", 10000);
            int i2 = this.profile.getInt("pulsar.producer.batch.maxsize", 1048576);
            int i3 = this.profile.getInt("pulsar.producer.batch.maxcount", 1000);
            boolean z = this.profile.getBoolean("pulsar.producer.enable.batch", true);
            boolean z2 = this.profile.getBoolean("pulsar.producer.block.queue", true);
            CompressionType convertType = PluginUtils.convertType(this.profile.get("pulsar.producer.compress.type", "snappy"));
            LOGGER.info("init producer, pulsarServers: {}, topic: {}, pendingNum: {}, batchSize: {}, batchCount: {}, enableBatch: {}, compressType: {}, blockQueue: {}", new Object[]{str, str2, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Boolean.valueOf(z), convertType, Boolean.valueOf(z2)});
            this.client = PulsarClient.builder().serviceUrl(str).build();
            return this.client.newProducer().topic(str2).compressionType(convertType).batchingMaxBytes(i2).batchingMaxMessages(i3).blockIfQueueFull(z2).maxPendingMessages(i).enableBatching(z).create();
        } catch (Exception e) {
            LOGGER.error("error while init producer", e);
            throw new RuntimeException(e);
        }
    }

    public void start() {
        for (int i = 0; i < this.threadNum; i++) {
            Producer<byte[]> constructProducer = constructProducer();
            if (constructProducer != null) {
                this.producerList.add(constructProducer);
                submitWorker(sendThread(constructProducer));
            } else {
                LOGGER.warn("producer is null, please check profile");
            }
        }
    }

    public void stop() throws Exception {
        this.writing = false;
        while (this.cache.size() > 0) {
            AgentUtils.silenceSleepInMs(this.pollTimeout);
        }
        for (Producer<byte[]> producer : this.producerList) {
            producer.flush();
            producer.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        this.producerList.clear();
        this.running = false;
    }
}
