package org.apache.inlong.dataproxy.sink.mq.tube;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.class */
public class TubeHandler implements MessageQueueHandler {
    private static final Logger logger = LoggerFactory.getLogger(TubeHandler.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
    private CacheClusterConfig config;
    private String clusterName;
    private MessageQueueZoneSinkContext sinkContext;
    private String masterHostAndPortList;
    private long linkMaxAllowedDelayedMsgCount;
    private long sessionWarnDelayedMsgCount;
    private long sessionMaxAllowedDelayedMsgCount;
    private long nettyWriteBufferHighWaterMark;
    private TubeMultiSessionFactory sessionFactory;
    private MessageProducer producer;
    private final Set<String> topicSet = new HashSet();
    private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void init(CacheClusterConfig cacheClusterConfig, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.config = cacheClusterConfig;
        this.clusterName = cacheClusterConfig.getClusterName();
        this.sinkContext = messageQueueZoneSinkContext;
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void start() {
        try {
            TubeClientConfig initTubeConfig = initTubeConfig();
            logger.info("try to create producer:{}", initTubeConfig.toJsonString());
            this.sessionFactory = new TubeMultiSessionFactory(initTubeConfig);
            this.producer = this.sessionFactory.createProducer();
            logger.info("create new producer success:{}", this.producer);
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void publishTopic(Set<String> set) {
        if (this.producer == null || set == null || set.isEmpty()) {
            return;
        }
        try {
            Set publish = this.producer.publish(set);
            this.topicSet.addAll(set);
            logger.info("Publish topics to {}, need publish are {}, published are {}", new Object[]{this.clusterName, set, publish});
        } catch (Throwable th) {
            logger.warn("Publish topics to {} failure", this.clusterName, th);
        }
    }

    private TubeClientConfig initTubeConfig() throws Exception {
        Context context = new Context(this.sinkContext.getProducerContext().getParameters());
        context.putAll(this.config.getParams());
        this.masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
        this.linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, 80000L).longValue();
        this.sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT, 2000000L).longValue();
        this.sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT, 4000000L).longValue();
        this.nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK, 15728640L).longValue();
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(this.linkMaxAllowedDelayedMsgCount);
        tubeClientConfig.setSessionWarnDelayedMsgCount(this.sessionWarnDelayedMsgCount);
        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(this.sessionMaxAllowedDelayedMsgCount);
        tubeClientConfig.setNettyWriteBufferHighWaterMark(this.nettyWriteBufferHighWaterMark);
        tubeClientConfig.setHeartbeatPeriodMs(15000L);
        tubeClientConfig.setRpcTimeoutMs(20000L);
        return tubeClientConfig;
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void stop() {
        if (this.producer != null) {
            try {
                this.producer.shutdown();
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
            }
        }
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (TubeClientException e) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info("tube handler stopped");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public boolean send(PackProfile packProfile) {
        String topicName;
        try {
            IdTopicConfig sinkIdTopicConfig = ConfigManager.getInstance().getSinkIdTopicConfig(packProfile.getInlongGroupId(), packProfile.getInlongStreamId());
            if (sinkIdTopicConfig != null) {
                topicName = sinkIdTopicConfig.getTopicName();
            } else {
                if (!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
                    this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, packProfile.getUid());
                    this.sinkContext.addSendResultMetric(packProfile, this.clusterName, packProfile.getUid(), false, 0L);
                    this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(packProfile);
                    packProfile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                    return false;
                }
                topicName = CommonConfigHolder.getInstance().getRandDefTopics();
                if (StringUtils.isEmpty(topicName)) {
                    this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, packProfile.getUid());
                    this.sinkContext.addSendResultMetric(packProfile, this.clusterName, packProfile.getUid(), false, 0L);
                    this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(packProfile);
                    packProfile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                    return false;
                }
                this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED, packProfile.getUid());
            }
            if (this.producer == null) {
                this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_PRODUCER_NULL, topicName);
                this.sinkContext.processSendFail(packProfile, this.clusterName, topicName, 0L, DataProxyErrCode.PRODUCER_IS_NULL, "");
                return false;
            }
            if (!this.topicSet.contains(topicName)) {
                this.producer.publish(topicName);
                this.topicSet.add(topicName);
            }
            if (packProfile instanceof SimplePackProfile) {
                sendSimplePackProfile((SimplePackProfile) packProfile, sinkIdTopicConfig, topicName);
                return true;
            }
            sendBatchPackProfile((BatchPackProfile) packProfile, sinkIdTopicConfig, topicName);
            return true;
        } catch (Throwable th) {
            this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION, null);
            this.sinkContext.processSendFail(packProfile, this.clusterName, packProfile.getUid(), 0L, DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, th.getMessage());
            if (!logCounter.shouldPrint()) {
                return false;
            }
            logger.error("Send Message to Tube failure", th);
            return false;
        }
    }

    private void sendBatchPackProfile(final BatchPackProfile batchPackProfile, IdTopicConfig idTopicConfig, final String str) throws Exception {
        EventHandler eventHandler = this.handlerLocal.get();
        if (eventHandler == null) {
            eventHandler = this.sinkContext.createEventHandler();
            this.handlerLocal.set(eventHandler);
        }
        Map<String, String> parseHeader = eventHandler.parseHeader(idTopicConfig, batchPackProfile, this.sinkContext.getNodeId(), this.sinkContext.getCompressType());
        byte[] parseBody = eventHandler.parseBody(idTopicConfig, batchPackProfile, this.sinkContext.getCompressType());
        Message message = new Message(str, parseBody);
        message.putSystemHeader(batchPackProfile.getInlongStreamId(), DateTimeUtils.ms2yyyyMMddHHmm(Long.parseLong(parseHeader.get("packTime"))));
        message.getClass();
        parseHeader.forEach(message::setAttrKeyVal);
        this.sinkContext.addSendMetric(batchPackProfile, this.clusterName, str, parseBody.length);
        final long currentTimeMillis = System.currentTimeMillis();
        this.producer.sendMessage(message, new MessageSentCallback() { // from class: org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler.1
            public void onMessageSent(MessageSentResult messageSentResult) {
                if (messageSentResult.isSuccess()) {
                    TubeHandler.this.sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                    TubeHandler.this.sinkContext.addSendResultMetric(batchPackProfile, TubeHandler.this.clusterName, str, true, currentTimeMillis);
                    TubeHandler.this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchPackProfile);
                    batchPackProfile.ack();
                    return;
                }
                TubeHandler.this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE, str + "." + messageSentResult.getErrCode());
                TubeHandler.this.sinkContext.processSendFail(batchPackProfile, TubeHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, messageSentResult.getErrMsg());
                if (TubeHandler.logCounter.shouldPrint()) {
                    TubeHandler.logger.error("Send ProfileV1 to tube failure {}", messageSentResult.getErrMsg());
                }
            }

            public void onException(Throwable th) {
                TubeHandler.this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT, str);
                TubeHandler.this.sinkContext.processSendFail(batchPackProfile, TubeHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, th.getMessage());
                if (TubeHandler.logCounter.shouldPrint()) {
                    TubeHandler.logger.error("Send ProfileV1 to tube exception", th);
                }
            }
        });
    }

    private void sendSimplePackProfile(final SimplePackProfile simplePackProfile, IdTopicConfig idTopicConfig, final String str) throws Exception {
        this.sinkContext.addSendMetric(simplePackProfile, this.clusterName, str, simplePackProfile.getEvent().getBody().length);
        Message message = new Message(str, simplePackProfile.getEvent().getBody());
        message.putSystemHeader(simplePackProfile.getInlongStreamId(), DateTimeUtils.ms2yyyyMMddHHmm(Long.parseLong(simplePackProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY))));
        final long currentTimeMillis = System.currentTimeMillis();
        Map<String, String> propsToMQ = simplePackProfile.getPropsToMQ(currentTimeMillis);
        message.getClass();
        propsToMQ.forEach(message::setAttrKeyVal);
        this.producer.sendMessage(message, new MessageSentCallback() { // from class: org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler.2
            public void onMessageSent(MessageSentResult messageSentResult) {
                if (messageSentResult.isSuccess()) {
                    TubeHandler.this.sinkContext.fileMetricAddSuccStats(simplePackProfile, str, messageSentResult.getPartition().getHost());
                    TubeHandler.this.sinkContext.addSendResultMetric(simplePackProfile, TubeHandler.this.clusterName, str, true, currentTimeMillis);
                    TubeHandler.this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simplePackProfile);
                    simplePackProfile.ack();
                    return;
                }
                TubeHandler.this.sinkContext.fileMetricAddFailStats(simplePackProfile, str, messageSentResult.getPartition().getHost(), str + "." + messageSentResult.getErrCode());
                TubeHandler.this.sinkContext.processSendFail(simplePackProfile, TubeHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, messageSentResult.getErrMsg());
                if (TubeHandler.logCounter.shouldPrint()) {
                    TubeHandler.logger.error("Send SimpleProfileV0 to tube failure: {}", messageSentResult.getErrMsg());
                }
            }

            public void onException(Throwable th) {
                TubeHandler.this.sinkContext.fileMetricAddExceptStats(simplePackProfile, str, "", str);
                TubeHandler.this.sinkContext.processSendFail(simplePackProfile, TubeHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, th.getMessage());
                if (TubeHandler.logCounter.shouldPrint()) {
                    TubeHandler.logger.error("Send Message to {} tube exception", str, th);
                }
            }
        });
    }
}
