package org.apache.inlong.dataproxy.sink.pulsar;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.commons.monitor.LogCounter;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.EventStat;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.shade.io.netty.util.NettyRuntime;
import org.apache.pulsar.shade.io.netty.util.internal.SystemPropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.class */
public class PulsarClientService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
    private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60000);
    private static String PULSAR_SERVER_URL_LIST = "pulsar_server_url_list";
    private static String PULSAR_TOKEN = "pulsar_token";
    private static String PULSAR_AUTH_TYPE = "pulsar_auth_type";
    private static String PULSAR_DEFAULT_AUTH_TYPE = "token";
    private static String SEND_TIMEOUT = "send_timeout_mill";
    private static String CLIENT_TIMEOUT = "client_timeout_second";
    private static String ENABLE_BATCH = "enable_batch";
    private static String PULSAR_IO_THREADS = "pulsar_io_threads";
    private static String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
    private static String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
    private static String MAX_PENDING_MESSAGES = "max_pending_messages";
    private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
    private static String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
    private static int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    private static int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
    private static int DEFAULT_SEND_TIMEOUT_MILL = 30000;
    private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
    private static long DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = 30000;
    private static boolean DEFAULT_ENABLE_BATCH = true;
    private static boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
    private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
    private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
    private String[] pulsarServerUrls;
    private String token;
    private String authType;
    private Integer sendTimeout;
    private Integer clientTimeout;
    private boolean enableBatch;
    private boolean blockIfQueueFull;
    private int maxPendingMessages;
    private int maxBatchingMessages;
    private long retryIntervalWhenSendMsgError;
    public Map<String, List<TopicProducerInfo>> producerInfoMap;
    public Map<String, AtomicLong> topicSendIndexMap;
    public List<PulsarClient> pulsarClients;
    public int pulsarClientIoThreads;
    public int pulsarConnectionsPreBroker;
    private String localIp;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService$TopicProducerInfo.class */
    public class TopicProducerInfo {
        private long lastSendMsgErrorTime;
        private Producer producer;
        private PulsarClient pulsarClient;
        private String topic;
        private volatile Boolean isCanUseSend = true;
        private volatile Boolean isFinishInit = false;

        public TopicProducerInfo(PulsarClient pulsarClient, String str) {
            this.pulsarClient = pulsarClient;
            this.topic = str;
        }

        public void initProducer() {
            try {
                this.producer = this.pulsarClient.newProducer().sendTimeout(PulsarClientService.this.sendTimeout.intValue(), TimeUnit.MILLISECONDS).topic(this.topic).enableBatching(PulsarClientService.this.enableBatch).blockIfQueueFull(PulsarClientService.this.blockIfQueueFull).maxPendingMessages(PulsarClientService.this.maxPendingMessages).batchingMaxMessages(PulsarClientService.this.maxBatchingMessages).create();
                this.isFinishInit = true;
            } catch (PulsarClientException e) {
                PulsarClientService.logger.error("create pulsar client has error e = {}", e);
                this.isFinishInit = false;
            }
        }

        public void setCanUseSend(Boolean bool) {
            this.isCanUseSend = bool;
            if (bool.booleanValue()) {
                return;
            }
            this.lastSendMsgErrorTime = System.currentTimeMillis();
        }

        public boolean isCanUseToSendMessage() {
            if (this.isCanUseSend.booleanValue() && this.isFinishInit.booleanValue()) {
                return true;
            }
            if (!this.isFinishInit.booleanValue() || System.currentTimeMillis() - this.lastSendMsgErrorTime <= PulsarClientService.this.retryIntervalWhenSendMsgError) {
                return false;
            }
            this.lastSendMsgErrorTime = System.currentTimeMillis();
            return true;
        }

        public Producer getProducer() {
            return this.producer;
        }
    }

    public PulsarClientService(Context context) {
        this.enableBatch = true;
        this.blockIfQueueFull = true;
        this.maxPendingMessages = 10000;
        this.maxBatchingMessages = 1000;
        this.retryIntervalWhenSendMsgError = 30000L;
        this.localIp = "127.0.0.1";
        String string = context.getString(PULSAR_SERVER_URL_LIST);
        Preconditions.checkState(string != null, "No pulsar server url specified");
        this.pulsarServerUrls = string.split("\\|");
        this.token = context.getString(PULSAR_TOKEN);
        this.authType = context.getString(PULSAR_AUTH_TYPE, PULSAR_DEFAULT_AUTH_TYPE);
        Preconditions.checkState(this.pulsarServerUrls != null && this.pulsarServerUrls.length > 0, "No pulsar server url config");
        this.sendTimeout = context.getInteger(SEND_TIMEOUT, Integer.valueOf(DEFAULT_SEND_TIMEOUT_MILL));
        this.retryIntervalWhenSendMsgError = context.getLong(RETRY_INTERVAL_WHEN_SEND_ERROR_MILL, Long.valueOf(DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL)).longValue();
        this.clientTimeout = context.getInteger(CLIENT_TIMEOUT, Integer.valueOf(DEFAULT_CLIENT_TIMEOUT_SECOND));
        logger.debug("PulsarClientService " + SEND_TIMEOUT + " " + this.sendTimeout);
        Preconditions.checkArgument(this.sendTimeout.intValue() > 0, "sendTimeout must be > 0");
        this.pulsarClientIoThreads = context.getInteger(PULSAR_IO_THREADS, Integer.valueOf(DEFAULT_PULSAR_IO_THREADS)).intValue();
        this.pulsarConnectionsPreBroker = context.getInteger(PULSAR_CONNECTIONS_PRE_BROKER, Integer.valueOf(DEFAULT_CONNECTIONS_PRE_BROKER)).intValue();
        this.enableBatch = context.getBoolean(ENABLE_BATCH, Boolean.valueOf(DEFAULT_ENABLE_BATCH)).booleanValue();
        this.blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, Boolean.valueOf(DEFAULT_BLOCK_IF_QUEUE_FULL)).booleanValue();
        this.maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, Integer.valueOf(DEFAULT_MAX_PENDING_MESSAGES)).intValue();
        this.maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, Integer.valueOf(DEFAULT_MAX_BATCHING_MESSAGES)).intValue();
        this.producerInfoMap = new ConcurrentHashMap();
        this.topicSendIndexMap = new ConcurrentHashMap();
        this.localIp = NetworkUtils.getLocalIp();
    }

    public void initCreateConnection(CreatePulsarClientCallBack createPulsarClientCallBack) {
        try {
            createConnection(createPulsarClientCallBack);
        } catch (FlumeException e) {
            logger.error("Unable to create pulsar client. Exception follows.", e);
            close();
        }
    }

    public boolean sendMessage(String str, Event event, SendMessageCallBack sendMessageCallBack, EventStat eventStat) {
        TopicProducerInfo topicProducerInfo = null;
        try {
            topicProducerInfo = getProducer(str);
        } catch (Exception e) {
            if (logPrinterA.shouldPrint()) {
                logger.error("Get producer failed!", e);
            }
        }
        if (topicProducerInfo == null) {
            return false;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("data_proxy_ip", this.localIp);
        String str2 = "";
        if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
            str2 = (String) event.getHeaders().get(AttributeConstants.INTERFACE_ID);
        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
            str2 = (String) event.getHeaders().get(AttributeConstants.INAME);
        }
        hashMap.put(str2, event.getHeaders().get("msg.pkg.time"));
        TopicProducerInfo topicProducerInfo2 = topicProducerInfo;
        topicProducerInfo2.getProducer().newMessage().properties(hashMap).value(event.getBody()).sendAsync().thenAccept(obj -> {
            AuditUtils.add(6, event);
            topicProducerInfo2.setCanUseSend(true);
            sendMessageCallBack.handleMessageSendSuccess(str, (MessageIdImpl) obj, eventStat);
        }).exceptionally(obj2 -> {
            topicProducerInfo2.setCanUseSend(false);
            sendMessageCallBack.handleMessageSendException(str, eventStat, obj2);
            return null;
        });
        return true;
    }

    private void createConnection(CreatePulsarClientCallBack createPulsarClientCallBack) throws FlumeException {
        if (this.pulsarClients != null) {
            return;
        }
        this.pulsarClients = new ArrayList();
        for (int i = 0; i < this.pulsarServerUrls.length; i++) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("index = {}, url = {}", Integer.valueOf(i), this.pulsarServerUrls[i]);
                }
                this.pulsarClients.add(initPulsarClient(this.pulsarServerUrls[i]));
                createPulsarClientCallBack.handleCreateClientSuccess(this.pulsarServerUrls[i]);
            } catch (PulsarClientException e) {
                createPulsarClientCallBack.handleCreateClientException(this.pulsarServerUrls[i]);
                logger.error("create connnection error in pulsar sink, maybe pulsar master set error, please re-check.url{}, ex1 {}", this.pulsarServerUrls[i], e);
            } catch (Throwable th) {
                createPulsarClientCallBack.handleCreateClientException(this.pulsarServerUrls[i]);
                logger.error("create connnection error in pulsar sink, maybe pulsar master set error/shutdown in progress, please re-check. url{}, ex2 {}", this.pulsarServerUrls[i], th);
            }
        }
        if (this.pulsarClients.size() == 0) {
            throw new FlumeException("connect to pulsar error1, maybe zkstr/zkroot set error, please re-check");
        }
    }

    private PulsarClient initPulsarClient(String str) throws Exception {
        ClientBuilder builder = PulsarClient.builder();
        if (PULSAR_DEFAULT_AUTH_TYPE.equals(this.authType) && StringUtils.isNotEmpty(this.token)) {
            builder.authentication(AuthenticationFactory.token(this.token));
        }
        builder.serviceUrl(str).ioThreads(this.pulsarClientIoThreads).connectionsPerBroker(this.pulsarConnectionsPreBroker).connectionTimeout(this.clientTimeout.intValue(), TimeUnit.SECONDS);
        return builder.build();
    }

    public List<TopicProducerInfo> initTopicProducer(String str) {
        return this.producerInfoMap.computeIfAbsent(str, str2 -> {
            ArrayList arrayList = new ArrayList();
            if (this.pulsarClients != null) {
                Iterator<PulsarClient> it = this.pulsarClients.iterator();
                while (it.hasNext()) {
                    TopicProducerInfo topicProducerInfo = new TopicProducerInfo(it.next(), str);
                    topicProducerInfo.initProducer();
                    arrayList.add(topicProducerInfo);
                }
            }
            return arrayList;
        });
    }

    private TopicProducerInfo getProducer(String str) {
        TopicProducerInfo topicProducerInfo;
        List<TopicProducerInfo> initTopicProducer = initTopicProducer(str);
        AtomicLong computeIfAbsent = this.topicSendIndexMap.computeIfAbsent(str, str2 -> {
            return new AtomicLong(0L);
        });
        int size = initTopicProducer.size();
        if (size == 0) {
            return null;
        }
        int i = 0;
        do {
            topicProducerInfo = initTopicProducer.get((int) (computeIfAbsent.getAndIncrement() % size));
            if (topicProducerInfo.isCanUseToSendMessage() && topicProducerInfo.getProducer().isConnected()) {
                break;
            }
            i++;
        } while (i < size);
        return topicProducerInfo;
    }

    public Map<String, List<TopicProducerInfo>> getProducerInfoMap() {
        return this.producerInfoMap;
    }

    private void destroyConnection() {
        this.producerInfoMap.clear();
        if (this.pulsarClients != null) {
            Iterator<PulsarClient> it = this.pulsarClients.iterator();
            while (it.hasNext()) {
                try {
                    it.next().shutdown();
                } catch (Exception e) {
                    logger.error("destroy pulsarClient error in PulsarSink, ex {}", e.getMessage());
                } catch (PulsarClientException e2) {
                    logger.error("destroy pulsarClient error in PulsarSink, PulsarClientException {}", e2.getMessage());
                }
            }
        }
        this.pulsarClients = null;
        logger.debug("closed meta producer");
    }

    public void close() {
        destroyConnection();
    }
}
