package org.apache.inlong.dataproxy.sink.mqzone.impl.pulsarzone;

import java.security.SecureRandom;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
import org.apache.inlong.dataproxy.source.tcp.InlongTcpChannelHandler;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mqzone/impl/pulsarzone/PulsarClusterProducer.class */
public class PulsarClusterProducer extends AbstractZoneClusterProducer {
    public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
    private String tenant;
    private String namespace;
    private PulsarClient client;
    private ProducerBuilder<byte[]> baseBuilder;
    private Map<String, Producer<byte[]>> producerMap;

    public PulsarClusterProducer(String str, CacheClusterConfig cacheClusterConfig, PulsarZoneSinkContext pulsarZoneSinkContext) {
        super(str, cacheClusterConfig, pulsarZoneSinkContext);
        this.producerMap = new ConcurrentHashMap();
        this.tenant = cacheClusterConfig.getParams().get(ConfigConstants.KEY_TENANT);
        this.namespace = cacheClusterConfig.getParams().get(ConfigConstants.KEY_NAMESPACE);
    }

    public void start() {
        this.state = LifecycleState.START;
        try {
            this.client = PulsarClient.builder().serviceUrl(this.config.getParams().get("serviceUrl")).authentication(AuthenticationFactory.token(this.config.getParams().get("authentication"))).ioThreads(this.producerContext.getInteger("ioThreads", 1).intValue()).memoryLimit(this.producerContext.getLong("memoryLimit", 1073741824L).longValue(), SizeUnit.BYTES).connectionsPerBroker(this.producerContext.getInteger("connectionsPerBroker", 10).intValue()).build();
            this.baseBuilder = this.client.newProducer();
            this.baseBuilder.sendTimeout(this.producerContext.getInteger("sendTimeout", 0).intValue(), TimeUnit.MILLISECONDS).maxPendingMessages(this.producerContext.getInteger("maxPendingMessages", 500).intValue()).maxPendingMessagesAcrossPartitions(this.producerContext.getInteger("maxPendingMessagesAcrossPartitions", 60000).intValue());
            this.baseBuilder.batchingMaxMessages(this.producerContext.getInteger("batchingMaxMessages", 500).intValue()).batchingMaxPublishDelay(this.producerContext.getInteger("batchingMaxPublishDelay", 100).intValue(), TimeUnit.MILLISECONDS).batchingMaxBytes(this.producerContext.getInteger("batchingMaxBytes", 131072).intValue());
            this.baseBuilder.accessMode(ProducerAccessMode.Shared).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).blockIfQueueFull(this.producerContext.getBoolean("blockIfQueueFull", true).booleanValue());
            this.baseBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(this.producerContext.getInteger("roundRobinRouterBatchingPartitionSwitchFrequency", 60).intValue()).enableBatching(this.producerContext.getBoolean("enableBatching", true).booleanValue()).compressionType(getPulsarCompressionType());
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    private CompressionType getPulsarCompressionType() {
        String string = this.producerContext.getString("compressionType", CompressionType.SNAPPY.name());
        boolean z = -1;
        switch (string.hashCode()) {
            case -1844697261:
                if (string.equals("SNAPPY")) {
                    z = 4;
                    break;
                }
                break;
            case 75878:
                if (string.equals("LZ4")) {
                    z = false;
                    break;
                }
                break;
            case 2402104:
                if (string.equals("NONE")) {
                    z = true;
                    break;
                }
                break;
            case 2756555:
                if (string.equals("ZLIB")) {
                    z = 2;
                    break;
                }
                break;
            case 2763625:
                if (string.equals("ZSTD")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return CompressionType.LZ4;
            case true:
                return CompressionType.NONE;
            case InlongTcpChannelHandler.VERSION_PARAM_LENGTH /* 2 */:
                return CompressionType.ZLIB;
            case true:
                return CompressionType.ZSTD;
            case true:
                return CompressionType.SNAPPY;
            default:
                return CompressionType.NONE;
        }
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        Iterator<Map.Entry<String, Producer<byte[]>>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (PulsarClientException e) {
                LOG.error(e.getMessage(), e);
            }
        }
        try {
            this.client.close();
        } catch (PulsarClientException e2) {
            LOG.error(e2.getMessage(), e2);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer
    public boolean send(DispatchProfile dispatchProfile) {
        try {
            String producerTopic = getProducerTopic(dispatchProfile);
            if (producerTopic == null) {
                this.sinkContext.addSendResultMetric(dispatchProfile, dispatchProfile.getUid(), false, 0L);
                dispatchProfile.fail();
                return false;
            }
            Producer<byte[]> producer = this.producerMap.get(producerTopic);
            if (producer == null) {
                try {
                    LOG.info("try to new a object for topic " + producerTopic);
                    producer = this.baseBuilder.clone().topic(producerTopic).producerName(this.workerName + "-" + this.cacheClusterName + "-" + producerTopic + "-" + new SecureRandom((this.workerName + "-" + this.cacheClusterName + "-" + producerTopic + System.currentTimeMillis()).getBytes()).nextLong()).create();
                    LOG.info("create new producer success:{}", producer.getProducerName());
                    Producer<byte[]> putIfAbsent = this.producerMap.putIfAbsent(producerTopic, producer);
                    if (putIfAbsent != null) {
                        producer.close();
                        LOG.info("close producer success:{}", producer.getProducerName());
                        producer = putIfAbsent;
                    }
                } catch (Throwable th) {
                    LOG.error("create new producer failed", th);
                }
            }
            if (producer == null) {
                this.sinkContext.processSendFail(dispatchProfile, producerTopic, 0L);
                return false;
            }
            Map<String, String> encodeCacheMessageHeaders = encodeCacheMessageHeaders(dispatchProfile);
            byte[] encodeCacheMessageBody = EventUtils.encodeCacheMessageBody(this.sinkContext.getCompressType(), dispatchProfile.getEvents());
            long currentTimeMillis = System.currentTimeMillis();
            producer.newMessage().properties(encodeCacheMessageHeaders).value(encodeCacheMessageBody).sendAsync().whenCompleteAsync((messageId, th2) -> {
                if (th2 == null) {
                    this.sinkContext.addSendResultMetric(dispatchProfile, producerTopic, true, currentTimeMillis);
                    dispatchProfile.ack();
                } else {
                    LOG.error("Send fail:{}", th2.getMessage());
                    LOG.error(th2.getMessage(), th2);
                    this.sinkContext.processSendFail(dispatchProfile, producerTopic, currentTimeMillis);
                }
            });
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            this.sinkContext.processSendFail(dispatchProfile, dispatchProfile.getUid(), 0L);
            return false;
        }
    }

    private String getProducerTopic(DispatchProfile dispatchProfile) {
        String topic = this.sinkContext.getIdTopicHolder().getTopic(dispatchProfile.getUid());
        if (topic == null) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        if (this.tenant != null) {
            sb.append(this.tenant).append("/");
        }
        if (this.namespace != null) {
            sb.append(this.namespace).append("/");
        }
        sb.append(topic);
        return sb.toString();
    }
}
