package org.apache.inlong.dataproxy.sink;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.handler.codec.TooLongFrameException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
import org.apache.inlong.dataproxy.sink.pulsar.SinkTask;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/PulsarSink.class */
public class PulsarSink extends AbstractSink implements Configurable, SendMessageCallBack, CreatePulsarClientCallBack {
    private static final String SEPARATOR = "#";
    private RateLimiter diskRateLimiter;
    private SinkCounter sinkCounter;
    private SinkTask[] sinkThreadPool;
    private int sinkThreadPoolSize;
    private PulsarClientService pulsarClientService;
    private MonitorIndex monitorIndex;
    private MonitorIndexExt monitorIndexExt;
    private DataProxyMetricItemSet metricItemSet;
    private ConfigManager configManager;
    private Map<String, String> topicProperties;
    private Map<String, String> pulsarCluster;
    private MQClusterConfig pulsarConfig;
    private static final Logger logger = LoggerFactory.getLogger(PulsarSink.class);
    private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60000);
    private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60000);
    private static final Long PRINT_INTERVAL = 30L;
    private static final PulsarPerformanceTask PULSAR_PERFORMANCE_TASK = new PulsarPerformanceTask();
    private static final LoadingCache<String, Long> AGENT_ID_CACHE = CacheBuilder.newBuilder().concurrencyLevel(32).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS).build(new CacheLoader<String, Long>() { // from class: org.apache.inlong.dataproxy.sink.PulsarSink.1
        @Nonnull
        public Long load(@Nonnull String str) {
            return Long.valueOf(System.currentTimeMillis());
        }
    });
    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_CNT = new AtomicLong(0);
    private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_SIZE = new AtomicLong(0);
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
    private final AtomicLong currentInFlightCount = new AtomicLong(0);
    private final AtomicLong currentSuccessSendCnt = new AtomicLong(0);
    private final AtomicLong lastSuccessSendCnt = new AtomicLong(0);
    private final AtomicInteger processIndex = new AtomicInteger(0);
    private long t1 = System.currentTimeMillis();
    private int maxMonitorCnt = 300000;
    private volatile boolean canTake = false;
    private int eventQueueSize = 10000;
    private int badEventQueueSize = 10000;
    private int maxRetrySendCnt = 16;

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/PulsarSink$PulsarPerformanceTask.class */
    static class PulsarPerformanceTask implements Runnable {
        PulsarPerformanceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (PulsarSink.TOTAL_PULSAR_SUCC_SEND_SIZE.get() != 0) {
                    PulsarSink.logger.info("Total pulsar performance tps :" + (PulsarSink.TOTAL_PULSAR_SUCC_SEND_CNT.get() / PulsarSink.PRINT_INTERVAL.longValue()) + "/s, avg msg size:" + (PulsarSink.TOTAL_PULSAR_SUCC_SEND_SIZE.get() / PulsarSink.TOTAL_PULSAR_SUCC_SEND_CNT.get()) + ",print every " + PulsarSink.PRINT_INTERVAL + " seconds");
                    PulsarSink.TOTAL_PULSAR_SUCC_SEND_CNT.set(0L);
                    PulsarSink.TOTAL_PULSAR_SUCC_SEND_SIZE.set(0L);
                }
            } catch (Exception e) {
                PulsarSink.logger.info("pulsarPerformanceTask error", e);
            }
        }
    }

    public PulsarSink() {
        logger.debug("new instance of PulsarSink!");
    }

    public void configure(Context context) {
        logger.info("PulsarSink started and context = {}", context.toString());
        this.maxMonitorCnt = context.getInteger(ConfigConstants.MAX_MONITOR_CNT, 300000).intValue();
        this.configManager = ConfigManager.getInstance();
        this.topicProperties = this.configManager.getTopicProperties();
        this.pulsarCluster = this.configManager.getMqClusterUrl2Token();
        this.pulsarConfig = this.configManager.getMqClusterConfig();
        this.sinkThreadPoolSize = this.pulsarConfig.getThreadNum();
        if (this.sinkThreadPoolSize <= 0) {
            this.sinkThreadPoolSize = 1;
        }
        this.pulsarClientService = new PulsarClientService(this.pulsarConfig, this.sinkThreadPoolSize);
        this.configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() { // from class: org.apache.inlong.dataproxy.sink.PulsarSink.2
            @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
            public void update() {
                if (PulsarSink.this.pulsarClientService != null) {
                    PulsarSink.this.diffSetPublish(PulsarSink.this.pulsarClientService, new HashSet(PulsarSink.this.topicProperties.values()), new HashSet(PulsarSink.this.configManager.getTopicProperties().values()));
                }
            }
        });
        this.configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() { // from class: org.apache.inlong.dataproxy.sink.PulsarSink.3
            @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
            public void update() {
                if (PulsarSink.this.pulsarClientService != null) {
                    PulsarSink.this.diffUpdatePulsarClient(PulsarSink.this.pulsarClientService, PulsarSink.this.pulsarCluster, PulsarSink.this.configManager.getMqClusterUrl2Token());
                }
            }
        });
        this.maxRetrySendCnt = this.pulsarConfig.getMaxRetryCnt();
        this.badEventQueueSize = this.pulsarConfig.getBadEventQueueSize();
        Preconditions.checkArgument(this.pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
        this.sinkThreadPool = new SinkTask[this.sinkThreadPoolSize];
        this.eventQueueSize = this.pulsarConfig.getEventQueueSize();
        if (this.pulsarConfig.getDiskIoRatePerSec() != 0) {
            this.diskRateLimiter = RateLimiter.create(this.pulsarConfig.getDiskIoRatePerSec());
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
    }

    private void initTopicSet(PulsarClientService pulsarClientService, Set<String> set) {
        long currentTimeMillis = System.currentTimeMillis();
        if (set != null) {
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                pulsarClientService.initTopicProducer(it.next());
            }
        }
        logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
        logger.info(getName() + " producer is ready for topics: " + pulsarClientService.getProducerInfoMap().keySet());
    }

    public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> set, Set<String> set2) {
        boolean z = false;
        for (String str : set2) {
            if (!set.contains(str)) {
                z = true;
                try {
                    pulsarClientService.initTopicProducer(str);
                } catch (Exception e) {
                    logger.error("get producer failed: ", e);
                }
            }
        }
        for (String str2 : set) {
            if (!set2.contains(str2)) {
                z = true;
                try {
                    pulsarClientService.destroyProducerByTopic(str2);
                } catch (Exception e2) {
                    logger.error("remove producer failed: ", e2);
                }
            }
        }
        if (z) {
            this.topicProperties = this.configManager.getTopicProperties();
            logger.info("topics.properties has changed, trigger diff publish for {}, old topic set = {}, new topic set = {}, current topicProperties = {}", new Object[]{getName(), set, set2, this.topicProperties});
        }
    }

    public void diffUpdatePulsarClient(PulsarClientService pulsarClientService, Map<String, String> map, Map<String, String> map2) {
        MapDifference difference = Maps.difference(map, map2);
        if (difference.areEqual()) {
            return;
        }
        logger.info("pulsarConfig has changed, close unused url clients and start new url clients");
        HashMap hashMap = new HashMap(difference.entriesOnlyOnLeft());
        HashMap hashMap2 = new HashMap(difference.entriesOnlyOnRight());
        for (String str : difference.entriesDiffering().keySet()) {
            hashMap.put(str, map.get(str));
            hashMap2.put(str, map2.get(str));
        }
        pulsarClientService.updatePulsarClients(this, hashMap, hashMap2, new HashSet(this.topicProperties.values()));
        this.pulsarCluster = this.configManager.getMqClusterUrl2Token();
        if (ConfigManager.getInstance().isMqClusterReady()) {
            return;
        }
        ConfigManager.getInstance().updMqClusterStatus(true);
        logger.info("[{}] MQ Cluster service status ready!", getName());
    }

    public void start() {
        logger.info("[{}] pulsar sink starting...", getName());
        this.sinkCounter.start();
        this.pulsarClientService.initCreateConnection(this, getName());
        int statIntervalSec = this.pulsarConfig.getStatIntervalSec();
        Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
        if (statIntervalSec > 0) {
            this.monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, this.maxMonitorCnt);
            this.monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + getName(), statIntervalSec, this.maxMonitorCnt);
        }
        super.start();
        try {
            initTopicSet(this.pulsarClientService, new HashSet(this.topicProperties.values()));
        } catch (Exception e) {
            logger.info("pulsar sink start publish topic fail.", e);
        }
        for (int i = 0; i < this.sinkThreadPoolSize; i++) {
            this.sinkThreadPool[i] = new SinkTask(this.pulsarClientService, this, this.eventQueueSize / this.sinkThreadPoolSize, this.badEventQueueSize / this.sinkThreadPoolSize, i, true);
            this.sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
            this.sinkThreadPool[i].start();
        }
        this.metricItemSet = new DataProxyMetricItemSet(ConfigManager.getInstance().getCommonProperties().getOrDefault("proxy.cluster.name", ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME), getName());
        MetricRegister.register(this.metricItemSet);
        this.canTake = true;
        logger.info("[{}] Pulsar sink started", getName());
    }

    public void stop() {
        logger.info("pulsar sink stopping");
        this.canTake = false;
        int i = 0;
        while (isAllSendFinished()) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.info("Stop thread has been interrupt!");
            }
        }
        if (this.pulsarConfig.getStatIntervalSec() > 0) {
            try {
                this.monitorIndex.shutDown();
            } catch (Exception e2) {
                logger.warn("stat runner interrupted");
            }
        }
        if (this.pulsarClientService != null) {
            this.pulsarClientService.close();
        }
        if (this.sinkThreadPool != null) {
            for (SinkTask sinkTask : this.sinkThreadPool) {
                if (sinkTask != null) {
                    sinkTask.close();
                    sinkTask.interrupt();
                }
            }
            this.sinkThreadPool = null;
        }
        super.stop();
        if (!SCHEDULED_EXECUTOR_SERVICE.isShutdown()) {
            SCHEDULED_EXECUTOR_SERVICE.shutdown();
        }
        this.sinkCounter.stop();
        logger.debug("pulsar sink stopped. Metrics:{}", this.sinkCounter);
    }

    private boolean isAllSendFinished() {
        for (int i = 0; i < this.sinkThreadPoolSize; i++) {
            if (!this.sinkThreadPool[i].isAllSendFinished()) {
                return false;
            }
        }
        return true;
    }

    public Sink.Status process() throws EventDeliveryException {
        if (!this.canTake) {
            return Sink.Status.BACKOFF;
        }
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take != null) {
                    if (this.diskRateLimiter != null) {
                        this.diskRateLimiter.acquire(take.getBody().length);
                    }
                    if (processEvent(new EventStat(take))) {
                        transaction.commit();
                    } else {
                        logger.info("[{}] Channel --> Queue(has no enough space,current code point) --> pulsar,Check if pulsar server or network is ok.(if this situation last long time it will cause memoryChannel full and fileChannel write.)", getName());
                        transaction.rollback();
                    }
                } else {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                }
                transaction.close();
            } catch (Throwable th) {
                logger.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    logger.error("pulsar sink transaction rollback exception", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack
    public void handleCreateClientSuccess(String str) {
        logger.info("createConnection success for url = {}", str);
        this.sinkCounter.incrementConnectionCreatedCount();
    }

    @Override // org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack
    public void handleCreateClientException(String str) {
        logger.info("createConnection has exception for url = {}", str);
        this.sinkCounter.incrementConnectionFailedCount();
    }

    @Override // org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack
    public void handleMessageSendSuccess(String str, Object obj, EventStat eventStat, long j) {
        TOTAL_PULSAR_SUCC_SEND_CNT.incrementAndGet();
        TOTAL_PULSAR_SUCC_SEND_SIZE.addAndGet(eventStat.getEvent().getBody().length);
        this.sinkCounter.incrementEventDrainSuccessCount();
        this.currentInFlightCount.decrementAndGet();
        this.currentSuccessSendCnt.incrementAndGet();
        long j2 = this.currentSuccessSendCnt.get();
        long j3 = this.lastSuccessSendCnt.get();
        long logEveryNEvents = this.pulsarConfig.getLogEveryNEvents();
        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
        if (j2 % logEveryNEvents == 0 && j2 != this.lastSuccessSendCnt.get()) {
            this.lastSuccessSendCnt.set(j2);
            long currentTimeMillis = System.currentTimeMillis();
            logger.info("Pulsar sink {}, succ put {} events to pulsar in the past {} millsec", new Object[]{getName(), Long.valueOf(j2 - j3), Long.valueOf(currentTimeMillis - this.t1)});
            this.t1 = currentTimeMillis;
        }
        addStatistics(eventStat, true, j);
        if (eventStat.isSinkRspType()) {
            MessageUtils.sinkReturnRspPackage((SinkRspEvent) eventStat.getEvent(), DataProxyErrCode.SUCCESS, "");
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack
    public void handleMessageSendException(String str, EventStat eventStat, Object obj, DataProxyErrCode dataProxyErrCode, String str2) {
        this.currentInFlightCount.decrementAndGet();
        boolean z = true;
        if (obj instanceof PulsarClientException.NotFoundException) {
            logger.error("NotFoundException for topic " + str + ", message will be discard!", obj);
            z = false;
        } else if (obj instanceof TooLongFrameException) {
            logger.error("TooLongFrameException, send failed for " + getName(), obj);
        } else if (obj instanceof PulsarClientException.ProducerQueueIsFullError) {
            logger.error("ProducerQueueIsFullError, send failed for " + getName(), obj);
        } else if (!(obj instanceof PulsarClientException.AlreadyClosedException) && !(obj instanceof PulsarClientException.NotConnectedException) && !(obj instanceof PulsarClientException.TopicTerminatedException) && logPrinterB.shouldPrint()) {
            logger.error("send failed for " + getName(), obj);
        }
        addStatistics(eventStat, false, 0L);
        if (eventStat.isSinkRspType()) {
            MessageUtils.sinkReturnRspPackage((SinkRspEvent) eventStat.getEvent(), dataProxyErrCode, str2);
            return;
        }
        eventStat.incRetryCnt();
        if (z) {
            processResendEvent(eventStat);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack
    public void handleRequestProcError(String str, EventStat eventStat, boolean z, DataProxyErrCode dataProxyErrCode, String str2) {
        if (logPrinterB.shouldPrint()) {
            logger.error(str2);
        }
        addStatistics(eventStat, false, 0L);
        if (MessageUtils.isSinkRspType(eventStat.getEvent())) {
            MessageUtils.sinkReturnRspPackage((SinkRspEvent) eventStat.getEvent(), dataProxyErrCode, str2);
            return;
        }
        eventStat.incRetryCnt();
        if (z) {
            processResendEvent(eventStat);
        }
    }

    private void addStatistics(EventStat eventStat, boolean z, long j) {
        if (eventStat == null || eventStat.getEvent() == null) {
            return;
        }
        Event event = eventStat.getEvent();
        this.metricItemSet.fillSinkSendMetricItemsByEvent(event, j, z, event.getBody().length);
        if (z) {
            AuditUtils.add(6, event);
        }
        if (this.pulsarConfig.getStatIntervalSec() <= 0) {
            return;
        }
        String str = (String) event.getHeaders().get("topic");
        String str2 = (String) event.getHeaders().get("streamId");
        String str3 = (String) event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
        int parseInt = Integer.parseInt((String) event.getHeaders().get("msgcnt"));
        StringBuilder append = new StringBuilder(512).append(getName()).append("#").append(str).append("#").append(str2).append("#").append(str3).append("#").append(NetworkUtils.getLocalIp()).append("#").append((String) MessageUtils.getEventProcType(event).getRight()).append("#").append(DateTimeUtils.ms2yyyyMMddHHmm(Long.parseLong((String) event.getHeaders().get("dt"))));
        if (z) {
            this.monitorIndex.addAndGet(append.toString(), parseInt, 1, event.getBody().length, 0);
            this.monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
        } else {
            this.monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
            this.monitorIndex.addAndGet(append.toString(), 0, 0, 0L, parseInt);
        }
    }

    private boolean processEvent(EventStat eventStat) {
        if (eventStat == null || eventStat.getEvent() == null) {
            return true;
        }
        boolean z = true;
        Event event = eventStat.getEvent();
        if (eventStat.isOrderMessage()) {
            z = this.sinkThreadPool[Math.abs(((String) event.getHeaders().get("partitionKey")).hashCode()) % this.sinkThreadPoolSize].processEvent(eventStat);
        } else {
            int i = 0;
            do {
                SinkTask sinkTask = this.sinkThreadPool[this.processIndex.getAndIncrement() % this.sinkThreadPoolSize];
                if (sinkTask != null) {
                    z = sinkTask.processEvent(eventStat);
                    if (z) {
                        break;
                    }
                }
                i++;
            } while (i < this.sinkThreadPoolSize);
        }
        return z;
    }

    private void processResendEvent(EventStat eventStat) {
        String str;
        if (eventStat != null) {
            try {
                if (eventStat.getEvent() != null) {
                    if (this.pulsarConfig.getClientIdCache() && (str = (String) eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID)) != null && AGENT_ID_CACHE.asMap().containsKey(str)) {
                        AGENT_ID_CACHE.invalidate(str);
                    }
                    boolean z = false;
                    int i = 0;
                    do {
                        SinkTask sinkTask = this.sinkThreadPool[this.processIndex.getAndIncrement() % this.sinkThreadPoolSize];
                        if (sinkTask != null) {
                            z = sinkTask.processReSendEvent(eventStat);
                            if (z) {
                                break;
                            }
                        }
                        i++;
                    } while (i < this.sinkThreadPoolSize);
                    if (!z) {
                        FailoverChannelProcessorHolder.getChannelProcessor().processEvent(eventStat.getEvent());
                        if (logPrinterC.shouldPrint()) {
                            logger.error(getName() + " Channel --> pulsar --> ResendQueue(full) -->FailOverChannelProcessor(current code point), Resend queue is full,Check if pulsar server or network is ok.");
                        }
                    }
                    return;
                }
            } catch (Throwable th) {
                if (this.pulsarConfig.getStatIntervalSec() > 0) {
                    this.monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
                }
                if (logPrinterC.shouldPrint()) {
                    logger.error(getName() + " Discard msg because put events to both of queue and fileChannel fail", th);
                    return;
                }
                return;
            }
        }
        logger.warn("processResendEvent eventStat is null!");
    }

    public LoadingCache<String, Long> getAgentIdCache() {
        return AGENT_ID_CACHE;
    }

    public Map<String, String> getTopicsProperties() {
        return this.topicProperties;
    }

    public SinkCounter getSinkCounter() {
        return this.sinkCounter;
    }

    public AtomicLong getCurrentInFlightCount() {
        return this.currentInFlightCount;
    }

    public MQClusterConfig getPulsarConfig() {
        return this.pulsarConfig;
    }

    public int getMaxRetrySendCnt() {
        return this.maxRetrySendCnt;
    }

    static {
        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(PULSAR_PERFORMANCE_TASK, 0L, PRINT_INTERVAL.longValue(), TimeUnit.SECONDS);
        logger.info("success to start pulsar performance task");
    }
}
