package org.apache.inlong.dataproxy.sink.mq.pulsar;

import java.security.SecureRandom;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0;
import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0;
import org.apache.inlong.dataproxy.source.tcp.InlongTcpChannelHandler;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.class */
public class PulsarHandler implements MessageQueueHandler {
    public static final Logger LOG = LoggerFactory.getLogger(PulsarHandler.class);
    public static final String KEY_TENANT = "tenant";
    public static final String KEY_NAMESPACE = "namespace";
    public static final String KEY_SERVICE_URL = "serviceUrl";
    public static final String KEY_AUTHENTICATION = "authentication";
    public static final String KEY_ENABLEBATCHING = "enableBatching";
    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
    public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
    public static final String KEY_SENDTIMEOUT = "sendTimeout";
    public static final String KEY_COMPRESSIONTYPE = "compressionType";
    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
    public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouterBatchingPartitionSwitchFrequency";
    public static final String KEY_IOTHREADS = "ioThreads";
    public static final String KEY_MEMORYLIMIT = "memoryLimit";
    public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
    private CacheClusterConfig config;
    private String clusterName;
    private MessageQueueZoneSinkContext sinkContext;
    private String tenant;
    private String namespace;
    private PulsarClient client;
    private ProducerBuilder<byte[]> baseBuilder;
    private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
    private ConcurrentHashMap<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void init(CacheClusterConfig cacheClusterConfig, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.config = cacheClusterConfig;
        this.clusterName = cacheClusterConfig.getClusterName();
        this.sinkContext = messageQueueZoneSinkContext;
        this.tenant = cacheClusterConfig.getParams().get("tenant");
        this.namespace = cacheClusterConfig.getParams().get("namespace");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void start() {
        try {
            String str = this.config.getParams().get("serviceUrl");
            String str2 = this.config.getParams().get("authentication");
            Context producerContext = this.sinkContext.getProducerContext();
            ClientBuilder builder = PulsarClient.builder();
            if (StringUtils.isNotEmpty(str2)) {
                builder.authentication(AuthenticationFactory.token(str2));
            }
            this.client = builder.serviceUrl(str).ioThreads(producerContext.getInteger("ioThreads", 1).intValue()).memoryLimit(producerContext.getLong("memoryLimit", 1073741824L).longValue(), SizeUnit.BYTES).connectionsPerBroker(producerContext.getInteger("connectionsPerBroker", 10).intValue()).statsInterval(NumberUtils.toLong(this.config.getParams().get(ConfigConstants.KEY_STATS_INTERVAL_SECONDS), -1L), TimeUnit.SECONDS).build();
            this.baseBuilder = this.client.newProducer();
            this.baseBuilder.sendTimeout(producerContext.getInteger("sendTimeout", 0).intValue(), TimeUnit.MILLISECONDS).maxPendingMessages(producerContext.getInteger("maxPendingMessages", 500).intValue()).maxPendingMessagesAcrossPartitions(producerContext.getInteger("maxPendingMessagesAcrossPartitions", 60000).intValue());
            this.baseBuilder.batchingMaxMessages(producerContext.getInteger("batchingMaxMessages", 500).intValue()).batchingMaxPublishDelay(producerContext.getInteger("batchingMaxPublishDelay", 100).intValue(), TimeUnit.MILLISECONDS).batchingMaxBytes(producerContext.getInteger("batchingMaxBytes", 131072).intValue());
            this.baseBuilder.accessMode(ProducerAccessMode.Shared).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).blockIfQueueFull(producerContext.getBoolean("blockIfQueueFull", true).booleanValue());
            this.baseBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(producerContext.getInteger("roundRobinRouterBatchingPartitionSwitchFrequency", 60).intValue()).enableBatching(producerContext.getBoolean("enableBatching", true).booleanValue()).compressionType(getPulsarCompressionType());
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
        LOG.info("pulsar handler started");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void stop() {
        Iterator<Map.Entry<String, Producer<byte[]>>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (PulsarClientException e) {
                LOG.error(e.getMessage(), e);
            }
        }
        try {
            this.client.close();
        } catch (PulsarClientException e2) {
            LOG.error(e2.getMessage(), e2);
        }
        LOG.info("pulsar handler stopped");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public boolean send(BatchPackProfile batchPackProfile) {
        try {
            IdTopicConfig idConfig = this.sinkContext.getIdTopicHolder().getIdConfig(batchPackProfile.getUid());
            if (idConfig == null) {
                this.sinkContext.addSendResultMetric(batchPackProfile, this.clusterName, batchPackProfile.getUid(), false, 0L);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                return false;
            }
            String topicName = idConfig.getTopicName();
            if (topicName == null) {
                this.sinkContext.addSendResultMetric(batchPackProfile, this.clusterName, batchPackProfile.getUid(), false, 0L);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                return false;
            }
            String producerTopic = getProducerTopic(topicName, idConfig);
            if (producerTopic == null) {
                this.sinkContext.addSendResultMetric(batchPackProfile, this.clusterName, batchPackProfile.getUid(), false, 0L);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                batchPackProfile.fail();
                return false;
            }
            Producer<byte[]> producer = this.producerMap.get(producerTopic);
            if (producer == null) {
                try {
                    LOG.info("try to new a object for topic " + producerTopic);
                    producer = this.baseBuilder.clone().topic(producerTopic).producerName(producerTopic + "-" + new SecureRandom((producerTopic + System.currentTimeMillis()).getBytes()).nextLong()).create();
                    LOG.info("create new producer success:{}", producer.getProducerName());
                    Producer<byte[]> putIfAbsent = this.producerMap.putIfAbsent(producerTopic, producer);
                    if (putIfAbsent != null) {
                        producer.close();
                        LOG.info("close producer success:{}", producer.getProducerName());
                        producer = putIfAbsent;
                    }
                } catch (Throwable th) {
                    LOG.error("create new producer failed", th);
                }
            }
            if (producer == null) {
                this.sinkContext.processSendFail(batchPackProfile, this.clusterName, producerTopic, 0L);
                return false;
            }
            if (batchPackProfile instanceof SimpleBatchPackProfileV0) {
                sendSimpleProfileV0((SimpleBatchPackProfileV0) batchPackProfile, idConfig, producer, producerTopic);
                return true;
            }
            if (batchPackProfile instanceof OrderBatchPackProfileV0) {
                sendOrderProfileV0((OrderBatchPackProfileV0) batchPackProfile, idConfig, producer, producerTopic);
                return true;
            }
            sendProfileV1(batchPackProfile, idConfig, producer, producerTopic);
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            this.sinkContext.processSendFail(batchPackProfile, this.clusterName, batchPackProfile.getUid(), 0L);
            return false;
        }
    }

    private String getProducerTopic(String str, IdTopicConfig idTopicConfig) {
        StringBuilder sb = new StringBuilder();
        if (this.tenant != null) {
            sb.append(this.tenant).append("/");
        }
        String str2 = this.namespace;
        if (str2 == null) {
            str2 = idTopicConfig.getParams().get("namespace");
        }
        if (str2 != null) {
            sb.append(str2).append("/");
        }
        sb.append(str);
        return sb.toString();
    }

    private CompressionType getPulsarCompressionType() {
        String string = this.sinkContext.getProducerContext().getString("compressionType", CompressionType.SNAPPY.name());
        boolean z = -1;
        switch (string.hashCode()) {
            case -1844697261:
                if (string.equals("SNAPPY")) {
                    z = 3;
                    break;
                }
                break;
            case 75878:
                if (string.equals("LZ4")) {
                    z = false;
                    break;
                }
                break;
            case 2402104:
                if (string.equals("NONE")) {
                    z = 4;
                    break;
                }
                break;
            case 2756555:
                if (string.equals("ZLIB")) {
                    z = true;
                    break;
                }
                break;
            case 2763625:
                if (string.equals("ZSTD")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return CompressionType.LZ4;
            case true:
                return CompressionType.ZLIB;
            case InlongTcpChannelHandler.VERSION_PARAM_LENGTH /* 2 */:
                return CompressionType.ZSTD;
            case true:
                return CompressionType.SNAPPY;
            case true:
            default:
                return CompressionType.NONE;
        }
    }

    private void sendProfileV1(BatchPackProfile batchPackProfile, IdTopicConfig idTopicConfig, Producer<byte[]> producer, String str) throws Exception {
        EventHandler eventHandler = this.handlerLocal.get();
        if (eventHandler == null) {
            eventHandler = this.sinkContext.createEventHandler();
            this.handlerLocal.set(eventHandler);
        }
        Map<String, String> parseHeader = eventHandler.parseHeader(idTopicConfig, batchPackProfile, this.sinkContext.getNodeId(), this.sinkContext.getCompressType());
        byte[] parseBody = eventHandler.parseBody(idTopicConfig, batchPackProfile, this.sinkContext.getCompressType());
        this.sinkContext.addSendMetric(batchPackProfile, this.clusterName, str, parseBody.length);
        long currentTimeMillis = System.currentTimeMillis();
        producer.newMessage().properties(parseHeader).value(parseBody).sendAsync().whenCompleteAsync((messageId, th) -> {
            if (th != null) {
                LOG.error("Send fail:{}", th.getMessage());
                LOG.error(th.getMessage(), th);
                this.sinkContext.processSendFail(batchPackProfile, this.clusterName, str, currentTimeMillis);
            } else {
                this.sinkContext.addSendResultMetric(batchPackProfile, this.clusterName, str, true, currentTimeMillis);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                batchPackProfile.ack();
            }
        });
    }

    private void sendSimpleProfileV0(SimpleBatchPackProfileV0 simpleBatchPackProfileV0, IdTopicConfig idTopicConfig, Producer<byte[]> producer, String str) throws Exception {
        Map<String, String> properties = simpleBatchPackProfileV0.getProperties();
        if (MapUtils.isEmpty(properties)) {
            properties = simpleBatchPackProfileV0.getSimpleProfile().getHeaders();
        }
        byte[] body = simpleBatchPackProfileV0.getSimpleProfile().getBody();
        this.sinkContext.addSendMetric(simpleBatchPackProfileV0, this.clusterName, str, body.length);
        long currentTimeMillis = System.currentTimeMillis();
        producer.newMessage().properties(properties).value(body).sendAsync().whenCompleteAsync((messageId, th) -> {
            if (th != null) {
                LOG.error("Send fail:{}", th.getMessage());
                LOG.error(th.getMessage(), th);
                this.sinkContext.processSendFail(simpleBatchPackProfileV0, this.clusterName, str, currentTimeMillis);
            } else {
                this.sinkContext.addSendResultMetric(simpleBatchPackProfileV0, this.clusterName, str, true, currentTimeMillis);
                this.sinkContext.getDispatchQueue().release(simpleBatchPackProfileV0.getSize());
                simpleBatchPackProfileV0.ack();
            }
        });
    }

    private void sendOrderProfileV0(OrderBatchPackProfileV0 orderBatchPackProfileV0, IdTopicConfig idTopicConfig, Producer<byte[]> producer, String str) throws Exception {
        Map<String, String> headers = orderBatchPackProfileV0.getOrderProfile().getHeaders();
        byte[] body = orderBatchPackProfileV0.getOrderProfile().getBody();
        this.sinkContext.addSendMetric(orderBatchPackProfileV0, this.clusterName, str, body.length);
        long currentTimeMillis = System.currentTimeMillis();
        producer.newMessage().properties(headers).value(body).sendAsync().whenCompleteAsync((messageId, th) -> {
            if (th != null) {
                LOG.error("Send fail:{}", th.getMessage());
                LOG.error(th.getMessage(), th);
                this.sinkContext.processSendFail(orderBatchPackProfileV0, this.clusterName, str, currentTimeMillis);
            } else {
                this.sinkContext.addSendResultMetric(orderBatchPackProfileV0, this.clusterName, str, true, currentTimeMillis);
                this.sinkContext.getDispatchQueue().release(orderBatchPackProfileV0.getSize());
                orderBatchPackProfileV0.ack();
                orderBatchPackProfileV0.ackOrder();
            }
        });
    }
}
