package org.apache.inlong.sort.standalone.sink.pulsar;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.sink.elasticsearch.EsSinkContext;
import org.apache.inlong.sort.standalone.sink.hive.HdfsIdConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.class */
public class PulsarProducerCluster implements LifecycleAware {
    public static final Logger LOG = InlongLoggerFactory.getLogger(PulsarProducerCluster.class);
    public static final String KEY_SERVICE_URL = "serviceUrl";
    public static final String KEY_AUTHENTICATION = "authentication";
    public static final String KEY_ENABLEBATCHING = "enableBatching";
    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
    public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
    public static final String KEY_SENDTIMEOUT = "sendTimeout";
    public static final String KEY_COMPRESSIONTYPE = "compressionType";
    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
    public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouterBatchingPartitionSwitchFrequency";
    private final String workerName;
    private final CacheClusterConfig config;
    private final PulsarFederationSinkContext sinkContext;
    private final Context context;
    private final String cacheClusterName;
    private IEvent2PulsarRecordHandler handler;
    private PulsarClient client;
    private ProducerBuilder<byte[]> baseBuilder;
    private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap();
    private LifecycleState state = LifecycleState.IDLE;

    public PulsarProducerCluster(String str, CacheClusterConfig cacheClusterConfig, PulsarFederationSinkContext pulsarFederationSinkContext) {
        this.workerName = str;
        this.config = cacheClusterConfig;
        this.sinkContext = pulsarFederationSinkContext;
        this.context = pulsarFederationSinkContext.getProducerContext();
        this.cacheClusterName = cacheClusterConfig.getClusterName();
        this.handler = this.sinkContext.createEventHandler();
    }

    public void start() {
        this.state = LifecycleState.START;
        try {
            this.client = PulsarClient.builder().serviceUrl((String) this.config.getParams().get(KEY_SERVICE_URL)).authentication(AuthenticationFactory.token((String) this.config.getParams().get(KEY_AUTHENTICATION))).build();
            this.baseBuilder = this.client.newProducer();
            this.baseBuilder.hashingScheme(HashingScheme.Murmur3_32Hash).enableBatching(this.context.getBoolean(KEY_ENABLEBATCHING, true).booleanValue()).batchingMaxBytes(this.context.getInteger(KEY_BATCHINGMAXBYTES, 5242880).intValue()).batchingMaxMessages(this.context.getInteger(KEY_BATCHINGMAXMESSAGES, 3000).intValue()).batchingMaxPublishDelay(this.context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 1).intValue(), TimeUnit.MILLISECONDS);
            this.baseBuilder.maxPendingMessages(this.context.getInteger(KEY_MAXPENDINGMESSAGES, 1000).intValue()).maxPendingMessagesAcrossPartitions(this.context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000).intValue()).sendTimeout(this.context.getInteger(KEY_SENDTIMEOUT, 0).intValue(), TimeUnit.MILLISECONDS).compressionType(getPulsarCompressionType()).blockIfQueueFull(this.context.getBoolean(KEY_BLOCKIFQUEUEFULL, true).booleanValue()).roundRobinRouterBatchingPartitionSwitchFrequency(this.context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10).intValue()).batcherBuilder(BatcherBuilder.DEFAULT);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    private CompressionType getPulsarCompressionType() {
        String string = this.context.getString(KEY_COMPRESSIONTYPE);
        boolean z = -1;
        switch (string.hashCode()) {
            case -1844697261:
                if (string.equals("SNAPPY")) {
                    z = 4;
                    break;
                }
                break;
            case 75878:
                if (string.equals("LZ4")) {
                    z = false;
                    break;
                }
                break;
            case 2402104:
                if (string.equals("NONE")) {
                    z = true;
                    break;
                }
                break;
            case 2756555:
                if (string.equals("ZLIB")) {
                    z = 2;
                    break;
                }
                break;
            case 2763625:
                if (string.equals("ZSTD")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case EsSinkContext.DEFAULT_IS_USE_INDEX_ID /* 0 */:
                return CompressionType.LZ4;
            case HdfsIdConfig.SEPARATOR_LENGTH /* 1 */:
                return CompressionType.NONE;
            case true:
                return CompressionType.ZLIB;
            case true:
                return CompressionType.ZSTD;
            case true:
                return CompressionType.SNAPPY;
            default:
                return CompressionType.NONE;
        }
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        Iterator<Map.Entry<String, Producer<byte[]>>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (PulsarClientException e) {
                LOG.error(e.getMessage(), e);
            }
        }
        try {
            this.client.close();
        } catch (PulsarClientException e2) {
            LOG.error(e2.getMessage(), e2);
        }
    }

    public LifecycleState getLifecycleState() {
        return this.state;
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        Map headers = profileEvent.getHeaders();
        String str = (String) headers.get("topic");
        Producer<byte[]> producer = this.producerMap.get(str);
        if (producer == null) {
            try {
                LOG.info("try to new a object for topic " + str);
                producer = this.baseBuilder.clone().topic(str).producerName(this.workerName + "-" + this.cacheClusterName + "-" + str).create();
                LOG.info("create new producer success:{}", producer.getProducerName());
                Producer<byte[]> putIfAbsent = this.producerMap.putIfAbsent(str, producer);
                if (putIfAbsent != null) {
                    producer.close();
                    LOG.info("close producer success:{}", producer.getProducerName());
                    producer = putIfAbsent;
                }
            } catch (Throwable th) {
                LOG.error("create new producer failed", th);
            }
        }
        if (producer == null) {
            transaction.rollback();
            transaction.close();
            this.sinkContext.addSendResultMetric(profileEvent, str, false, System.currentTimeMillis());
            return false;
        }
        String str2 = (String) headers.get("messageKey");
        if (str2 == null) {
            str2 = (String) headers.get("sourceIp");
        }
        byte[] parse = this.handler.parse(this.sinkContext, profileEvent);
        if (parse != null) {
            long currentTimeMillis = System.currentTimeMillis();
            producer.newMessage().key(str2).properties(headers).value(parse).sendAsync().whenCompleteAsync((messageId, th2) -> {
                if (th2 == null) {
                    transaction.commit();
                    transaction.close();
                    this.sinkContext.addSendResultMetric(profileEvent, str, true, currentTimeMillis);
                    profileEvent.ack();
                    return;
                }
                LOG.error("Send fail:{}", th2.getMessage());
                LOG.error(th2.getMessage(), th2);
                transaction.rollback();
                transaction.close();
                this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
            });
            return true;
        }
        transaction.commit();
        profileEvent.ack();
        transaction.close();
        return true;
    }

    public String getCacheClusterName() {
        return this.cacheClusterName;
    }
}
