package org.apache.inlong.dataproxy.sink.pulsar;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.EventStat;
import org.apache.inlong.dataproxy.source.MsgType;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.class */
public class PulsarClientService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
    private Map<String, String> pulsarUrl2token;
    private String authType;
    private Integer sendTimeout;
    private Integer clientTimeout;
    private boolean enableBatch;
    private boolean blockIfQueueFull;
    private int maxPendingMessages;
    private int maxPendingMessagesAcrossPartitions;
    private CompressionType compressionType;
    private int maxBatchingBytes;
    private int maxBatchingMessages;
    private long maxBatchingPublishDelayMillis;
    private long retryIntervalWhenSendMsgError;
    public Map<String, List<TopicProducerInfo>> producerInfoMap;
    public Map<String, AtomicLong> topicSendIndexMap;
    public Map<String, PulsarClient> pulsarClients;
    public int pulsarClientIoThreads;
    public int pulsarConnectionsPreBroker;
    private String localIp;
    private StreamConfigLogMetric streamConfigLogMetric;
    private int sinkThreadPoolSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService$TopicProducerInfo.class */
    public class TopicProducerInfo {
        private long lastSendMsgErrorTime;
        private Producer[] producers;
        private PulsarClient pulsarClient;
        private int sinkThreadPoolSize;
        private String topic;
        private volatile Boolean isCanUseSend = true;
        private volatile Boolean isFinishInit = false;

        public TopicProducerInfo(PulsarClient pulsarClient, int i, String str) {
            this.pulsarClient = pulsarClient;
            this.sinkThreadPoolSize = i;
            this.topic = str;
            this.producers = new Producer[i];
        }

        public void initProducer() {
            initProducer(null, null);
        }

        public void initProducer(String str, String str2) {
            for (int i = 0; i < this.sinkThreadPoolSize; i++) {
                try {
                    this.producers[i] = createProducer();
                } catch (PulsarClientException e) {
                    PulsarClientService.logger.error("create pulsar client has error , topic = {}, inlongGroupId = {}, inlongStreamId= {}", new Object[]{this.topic, str, str2, e});
                    this.isFinishInit = false;
                    for (int i2 = 0; i2 < this.sinkThreadPoolSize; i2++) {
                        if (this.producers[i2] != null) {
                            this.producers[i2].closeAsync();
                        }
                    }
                    if (PulsarClientService.this.streamConfigLogMetric != null && StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(str2)) {
                        PulsarClientService.this.streamConfigLogMetric.updateConfigLog(str, str2, "pulsar-client", ConfigLogTypeEnum.ERROR, e.toString());
                        return;
                    }
                    return;
                }
            }
            this.isFinishInit = true;
        }

        private Producer createProducer() throws PulsarClientException {
            return this.pulsarClient.newProducer().sendTimeout(PulsarClientService.this.sendTimeout.intValue(), TimeUnit.MILLISECONDS).topic(this.topic).enableBatching(PulsarClientService.this.enableBatch).blockIfQueueFull(PulsarClientService.this.blockIfQueueFull).maxPendingMessages(PulsarClientService.this.maxPendingMessages).maxPendingMessagesAcrossPartitions(PulsarClientService.this.maxPendingMessagesAcrossPartitions).compressionType(PulsarClientService.this.compressionType).batchingMaxMessages(PulsarClientService.this.maxBatchingMessages).batchingMaxBytes(PulsarClientService.this.maxBatchingBytes).batchingMaxPublishDelay(PulsarClientService.this.maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS).create();
        }

        public void setCanUseSend(Boolean bool) {
            this.isCanUseSend = bool;
            if (bool.booleanValue()) {
                return;
            }
            this.lastSendMsgErrorTime = System.currentTimeMillis();
        }

        public boolean isCanUseToSendMessage() {
            if (this.isCanUseSend.booleanValue() && this.isFinishInit.booleanValue()) {
                return true;
            }
            if (!this.isFinishInit.booleanValue() || System.currentTimeMillis() - this.lastSendMsgErrorTime <= PulsarClientService.this.retryIntervalWhenSendMsgError) {
                return false;
            }
            this.lastSendMsgErrorTime = System.currentTimeMillis();
            return true;
        }

        public void close() {
            for (int i = 0; i < this.sinkThreadPoolSize; i++) {
                try {
                    if (this.producers[i] != null) {
                        this.producers[i].close();
                    }
                } catch (PulsarClientException e) {
                    PulsarClientService.logger.error("close pulsar producer has error e = {}", e);
                    return;
                }
            }
        }

        public Producer getProducer(int i) {
            return (i >= this.sinkThreadPoolSize || this.producers[i] == null) ? this.producers[0] : this.producers[i];
        }

        public PulsarClient getPulsarClient() {
            return this.pulsarClient;
        }
    }

    public PulsarClientService(ThirdPartyClusterConfig thirdPartyClusterConfig, int i) {
        this.enableBatch = true;
        this.blockIfQueueFull = true;
        this.maxPendingMessages = 10000;
        this.maxPendingMessagesAcrossPartitions = 500000;
        this.maxBatchingBytes = 131072;
        this.maxBatchingMessages = 1000;
        this.maxBatchingPublishDelayMillis = 1L;
        this.retryIntervalWhenSendMsgError = 30000L;
        this.localIp = "127.0.0.1";
        this.sinkThreadPoolSize = i;
        this.authType = thirdPartyClusterConfig.getAuthType();
        this.sendTimeout = Integer.valueOf(thirdPartyClusterConfig.getSendTimeoutMs());
        this.retryIntervalWhenSendMsgError = thirdPartyClusterConfig.getRetryIntervalWhenSendErrorMs();
        this.clientTimeout = Integer.valueOf(thirdPartyClusterConfig.getClientTimeoutSecond());
        Preconditions.checkArgument(this.sendTimeout.intValue() > 0, "sendTimeout must be > 0");
        this.pulsarClientIoThreads = thirdPartyClusterConfig.getPulsarClientIoThreads();
        this.pulsarConnectionsPreBroker = thirdPartyClusterConfig.getPulsarConnectionsPreBroker();
        this.enableBatch = thirdPartyClusterConfig.getEnableBatch();
        this.blockIfQueueFull = thirdPartyClusterConfig.getBlockIfQueueFull();
        this.maxPendingMessages = thirdPartyClusterConfig.getMaxPendingMessages();
        this.maxPendingMessagesAcrossPartitions = thirdPartyClusterConfig.getMaxPendingMessagesAcrossPartitions();
        String compressionType = thirdPartyClusterConfig.getCompressionType();
        if (StringUtils.isNotEmpty(compressionType)) {
            this.compressionType = CompressionType.valueOf(compressionType);
        } else {
            this.compressionType = CompressionType.NONE;
        }
        this.maxBatchingMessages = thirdPartyClusterConfig.getMaxBatchingMessages();
        this.maxBatchingBytes = thirdPartyClusterConfig.getMaxBatchingBytes();
        this.maxBatchingPublishDelayMillis = thirdPartyClusterConfig.getMaxBatchingPublishDelayMillis();
        this.producerInfoMap = new ConcurrentHashMap();
        this.topicSendIndexMap = new ConcurrentHashMap();
        this.localIp = NetworkUtils.getLocalIp();
    }

    public void setConfigLogMetric(StreamConfigLogMetric streamConfigLogMetric) {
        this.streamConfigLogMetric = streamConfigLogMetric;
    }

    public void initCreateConnection(CreatePulsarClientCallBack createPulsarClientCallBack) {
        try {
            createConnection(createPulsarClientCallBack);
        } catch (FlumeException e) {
            logger.error("Unable to create pulsar client. Exception follows.", e);
            close();
        }
    }

    public boolean sendMessage(int i, String str, Event event, SendMessageCallBack sendMessageCallBack, EventStat eventStat) {
        TopicProducerInfo topicProducerInfo;
        boolean z;
        String inlongStreamId = getInlongStreamId(event);
        String inlongGroupId = getInlongGroupId(event);
        try {
            topicProducerInfo = getProducerInfo(i, str, inlongGroupId, inlongStreamId);
        } catch (Exception e) {
            topicProducerInfo = null;
            logger.error("Get producer failed! topic = {}", str, e);
            if (this.streamConfigLogMetric != null) {
                this.streamConfigLogMetric.updateConfigLog(inlongGroupId, inlongStreamId, "pulsar-producer", ConfigLogTypeEnum.ERROR, e.toString());
            }
        }
        if (topicProducerInfo == null) {
            checkAndResponse(event, inlongGroupId, inlongStreamId);
            sendMessageCallBack.handleMessageSendException(str, eventStat, new PulsarClientException.NotFoundException("producer info is null"));
            return true;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("data_proxy_ip", this.localIp);
        hashMap.put(inlongStreamId, event.getHeaders().get("msg.pkg.time"));
        TopicProducerInfo topicProducerInfo2 = topicProducerInfo;
        Producer producer = topicProducerInfo.getProducer(i);
        if (producer == null) {
            logger.warn("Get producer is null! topic = {}", str);
            checkAndResponse(event, inlongGroupId, inlongStreamId);
            sendMessageCallBack.handleMessageSendException(str, eventStat, new PulsarClientException.NotFoundException("producer is null"));
            return true;
        }
        if (eventStat.isOrderMessage()) {
            try {
                sendMessageCallBack.handleMessageSendSuccess(str, producer.newMessage().key((String) event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY)).properties(hashMap).value(event.getBody()).send(), eventStat);
                AuditUtils.add(6, event);
                topicProducerInfo2.setCanUseSend(true);
                z = true;
            } catch (PulsarClientException e2) {
                if (this.streamConfigLogMetric != null) {
                    this.streamConfigLogMetric.updateConfigLog(inlongGroupId, inlongStreamId, "pulsar-producer", ConfigLogTypeEnum.ERROR, e2.toString());
                }
                topicProducerInfo2.setCanUseSend(false);
                sendMessageCallBack.handleMessageSendException(str, eventStat, e2);
                z = false;
                if (e2 instanceof PulsarClientException.NotFoundException) {
                    z = true;
                }
            }
            logger.debug("es.getRetryCnt() {}", Integer.valueOf(eventStat.getRetryCnt()));
            if (eventStat.getRetryCnt() == 0 || eventStat.getRetryCnt() == 1) {
                sendResponse((OrderEvent) event, inlongGroupId, inlongStreamId);
            }
        } else {
            producer.newMessage().properties(hashMap).value(event.getBody()).sendAsync().thenAccept(obj -> {
                AuditUtils.add(6, event);
                topicProducerInfo2.setCanUseSend(true);
                sendMessageCallBack.handleMessageSendSuccess(str, (MessageIdImpl) obj, eventStat);
            }).exceptionally(obj2 -> {
                if (this.streamConfigLogMetric != null) {
                    this.streamConfigLogMetric.updateConfigLog(inlongGroupId, inlongStreamId, "pulsar-producer", ConfigLogTypeEnum.ERROR, obj2.toString());
                }
                topicProducerInfo2.setCanUseSend(false);
                sendMessageCallBack.handleMessageSendException(str, eventStat, obj2);
                return null;
            });
            z = true;
        }
        return z;
    }

    private void checkAndResponse(Event event, String str, String str2) {
        if (MessageUtils.isSyncSendForOrder(event) && (event instanceof OrderEvent)) {
            sendResponse((OrderEvent) event, str, str2);
        }
    }

    private void sendResponse(OrderEvent orderEvent, String str, String str2) {
        String str3 = orderEvent.getHeaders().get(AttributeConstants.UNIQ_ID);
        if ("false".equals(orderEvent.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
            if (logger.isDebugEnabled()) {
                logger.debug("Not need to rsp message: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", new Object[]{str3, str, str2});
            }
        } else {
            if (orderEvent.getCtx() == null || !orderEvent.getCtx().channel().isActive()) {
                return;
            }
            orderEvent.getCtx().channel().eventLoop().execute(() -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", new Object[]{str3, str, str2});
                }
                orderEvent.getCtx().writeAndFlush(MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, str3));
            });
        }
    }

    private void createConnection(CreatePulsarClientCallBack createPulsarClientCallBack) throws FlumeException {
        if (this.pulsarClients != null) {
            return;
        }
        this.pulsarClients = new ConcurrentHashMap();
        this.pulsarUrl2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
        Preconditions.checkState(!this.pulsarUrl2token.isEmpty(), "No pulsar server url specified");
        logger.debug("number of pulsar cluster is {}", Integer.valueOf(this.pulsarUrl2token.size()));
        for (Map.Entry<String, String> entry : this.pulsarUrl2token.entrySet()) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("url = {}, token = {}", entry.getKey(), entry.getValue());
                }
                this.pulsarClients.put(entry.getKey(), initPulsarClient(entry.getKey(), entry.getValue()));
                createPulsarClientCallBack.handleCreateClientSuccess(entry.getKey());
            } catch (PulsarClientException e) {
                createPulsarClientCallBack.handleCreateClientException(entry.getKey());
                if (this.streamConfigLogMetric != null) {
                    this.streamConfigLogMetric.updateConfigLog("DataProxyGlobal", "DataProxyGlobal", "pulsar-client", ConfigLogTypeEnum.ERROR, e.toString());
                }
                logger.error("create connection error in Pulsar sink, maybe pulsar master set error, please re-check.url{}, ex1 {}", entry.getKey(), e.getMessage());
            } catch (Throwable th) {
                createPulsarClientCallBack.handleCreateClientException(entry.getKey());
                logger.error("create connection error in pulsar sink, maybe pulsar master set error/shutdown in progress, please re-check. url{}, ex2 {}", entry.getKey(), th.getMessage());
            }
        }
        if (this.pulsarClients.size() == 0) {
            throw new FlumeException("connect to pulsar error1, maybe zkstr/zkroot set error, please re-check");
        }
    }

    private PulsarClient initPulsarClient(String str, String str2) throws Exception {
        ClientBuilder builder = PulsarClient.builder();
        if (ThirdPartyClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(this.authType) && StringUtils.isNotEmpty(str2)) {
            builder.authentication(AuthenticationFactory.token(str2));
        }
        builder.serviceUrl(str).ioThreads(this.pulsarClientIoThreads).connectionsPerBroker(this.pulsarConnectionsPreBroker).connectionTimeout(this.clientTimeout.intValue(), TimeUnit.SECONDS);
        return builder.build();
    }

    public List<TopicProducerInfo> initTopicProducer(String str, String str2, String str3) {
        return this.producerInfoMap.computeIfAbsent(str, str4 -> {
            ArrayList arrayList = null;
            if (this.pulsarClients != null) {
                arrayList = new ArrayList();
                Iterator<PulsarClient> it = this.pulsarClients.values().iterator();
                while (it.hasNext()) {
                    TopicProducerInfo topicProducerInfo = new TopicProducerInfo(it.next(), this.sinkThreadPoolSize, str);
                    topicProducerInfo.initProducer(str2, str3);
                    if (topicProducerInfo.isCanUseToSendMessage()) {
                        arrayList.add(topicProducerInfo);
                    }
                }
                if (arrayList.size() == 0) {
                    arrayList = null;
                }
            }
            return arrayList;
        });
    }

    public List<TopicProducerInfo> initTopicProducer(String str) {
        return initTopicProducer(str, null, null);
    }

    private TopicProducerInfo getProducerInfo(int i, String str, String str2, String str3) {
        TopicProducerInfo topicProducerInfo;
        List<TopicProducerInfo> initTopicProducer = initTopicProducer(str, str2, str3);
        AtomicLong computeIfAbsent = this.topicSendIndexMap.computeIfAbsent(str, str4 -> {
            return new AtomicLong(0L);
        });
        int size = initTopicProducer == null ? 0 : initTopicProducer.size();
        if (size == 0) {
            return null;
        }
        int i2 = 0;
        do {
            topicProducerInfo = initTopicProducer.get((int) (computeIfAbsent.getAndIncrement() % size));
            if (topicProducerInfo.isCanUseToSendMessage() && topicProducerInfo.getProducer(i) != null && topicProducerInfo.getProducer(i).isConnected()) {
                break;
            }
            i2++;
        } while (i2 < size);
        return topicProducerInfo;
    }

    public Map<String, List<TopicProducerInfo>> getProducerInfoMap() {
        return this.producerInfoMap;
    }

    private void destroyConnection() {
        this.producerInfoMap.clear();
        if (this.pulsarClients != null) {
            Iterator<PulsarClient> it = this.pulsarClients.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().shutdown();
                } catch (Exception e) {
                    logger.error("destroy pulsarClient error in PulsarSink, ex {}", e.getMessage());
                } catch (PulsarClientException e2) {
                    logger.error("destroy pulsarClient error in PulsarSink, PulsarClientException {}", e2.getMessage());
                }
            }
        }
        this.pulsarClients = null;
        logger.debug("closed meta producer");
    }

    private void removeProducers(PulsarClient pulsarClient) {
        for (List<TopicProducerInfo> list : this.producerInfoMap.values()) {
            for (TopicProducerInfo topicProducerInfo : list) {
                if (topicProducerInfo.getPulsarClient().equals(pulsarClient)) {
                    topicProducerInfo.close();
                    list.remove(topicProducerInfo);
                }
            }
        }
    }

    public void updatePulsarClients(CreatePulsarClientCallBack createPulsarClientCallBack, Map<String, String> map, Map<String, String> map2, Set<String> set) {
        for (String str : map.keySet()) {
            PulsarClient pulsarClient = this.pulsarClients.get(str);
            if (pulsarClient != null) {
                try {
                    removeProducers(pulsarClient);
                    pulsarClient.shutdown();
                    this.pulsarClients.remove(str);
                } catch (Exception e) {
                    logger.error("shutdown pulsarClient error in PulsarSink, ex {}", e.getMessage());
                } catch (PulsarClientException e2) {
                    logger.error("shutdown pulsarClient error in PulsarSink, PulsarClientException {}", e2.getMessage());
                }
            }
        }
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("url = {}, token = {}", key, value);
                }
                PulsarClient initPulsarClient = initPulsarClient(key, value);
                this.pulsarClients.put(key, initPulsarClient);
                createPulsarClientCallBack.handleCreateClientSuccess(key);
                for (String str2 : set) {
                    TopicProducerInfo topicProducerInfo = new TopicProducerInfo(initPulsarClient, this.sinkThreadPoolSize, str2);
                    topicProducerInfo.initProducer();
                    if (topicProducerInfo.isCanUseToSendMessage()) {
                        this.producerInfoMap.computeIfAbsent(str2, str3 -> {
                            return new ArrayList();
                        }).add(topicProducerInfo);
                    }
                }
            } catch (PulsarClientException e3) {
                createPulsarClientCallBack.handleCreateClientException(key);
                logger.error("create connnection error in pulsarsink, maybe pulsar master set error, please re-check.url{}, ex1 {}", key, e3.getMessage());
            } catch (Throwable th) {
                createPulsarClientCallBack.handleCreateClientException(key);
                logger.error("create connnection error in pulsarsink, maybe pulsar master set error/shutdown in progress, please re-check. url{}, ex2 {}", key, th.getMessage());
            }
        }
    }

    private String getInlongStreamId(Event event) {
        String str = "";
        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
            str = (String) event.getHeaders().get(AttributeConstants.STREAM_ID);
        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
            str = (String) event.getHeaders().get(AttributeConstants.INAME);
        }
        return str;
    }

    private String getInlongGroupId(Event event) {
        return (String) event.getHeaders().get(AttributeConstants.GROUP_ID);
    }

    public void close() {
        destroyConnection();
    }
}
