package org.apache.inlong.dataproxy.sink.tubezone;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/tubezone/TubeClusterProducer.class */
public class TubeClusterProducer implements LifecycleAware {
    public static final Logger LOG = LoggerFactory.getLogger(TubeClusterProducer.class);
    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
    protected final String workerName;
    private final CacheClusterConfig config;
    private final TubeZoneSinkContext sinkContext;
    private final Context producerContext;
    private final String cacheClusterName;
    private String masterHostAndPortList;
    private long linkMaxAllowedDelayedMsgCount;
    private long sessionWarnDelayedMsgCount;
    private long sessionMaxAllowedDelayedMsgCount;
    private long nettyWriteBufferHighWaterMark;
    private TubeMultiSessionFactory sessionFactory;
    private MessageProducer producer;
    private Set<String> topicSet = new HashSet();
    private LifecycleState state = LifecycleState.IDLE;

    public TubeClusterProducer(String str, CacheClusterConfig cacheClusterConfig, TubeZoneSinkContext tubeZoneSinkContext) {
        this.workerName = str;
        this.config = cacheClusterConfig;
        this.sinkContext = tubeZoneSinkContext;
        this.producerContext = tubeZoneSinkContext.getProducerContext();
        this.cacheClusterName = cacheClusterConfig.getClusterName();
    }

    public void start() {
        this.state = LifecycleState.START;
        try {
            TubeClientConfig initTubeConfig = initTubeConfig();
            LOG.info("try to create producer:{}", initTubeConfig.toJsonString());
            this.sessionFactory = new TubeMultiSessionFactory(initTubeConfig);
            this.producer = this.sessionFactory.createProducer();
            LOG.info("create new producer success:{}", this.producer);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    private TubeClientConfig initTubeConfig() throws Exception {
        Context context = new Context(this.producerContext.getParameters());
        context.putAll(this.config.getParams());
        this.masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
        this.linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, 80000L).longValue();
        this.sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT, 2000000L).longValue();
        this.sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT, 4000000L).longValue();
        this.nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK, 15728640L).longValue();
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(this.linkMaxAllowedDelayedMsgCount);
        tubeClientConfig.setSessionWarnDelayedMsgCount(this.sessionWarnDelayedMsgCount);
        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(this.sessionMaxAllowedDelayedMsgCount);
        tubeClientConfig.setNettyWriteBufferHighWaterMark(this.nettyWriteBufferHighWaterMark);
        tubeClientConfig.setHeartbeatPeriodMs(15000L);
        tubeClientConfig.setRpcTimeoutMs(20000L);
        return tubeClientConfig;
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        if (this.producer != null) {
            try {
                this.producer.shutdown();
            } catch (Throwable th) {
                LOG.error(th.getMessage(), th);
            }
        }
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (TubeClientException e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }

    public LifecycleState getLifecycleState() {
        return this.state;
    }

    public boolean send(final DispatchProfile dispatchProfile) {
        try {
            final String topic = this.sinkContext.getIdTopicHolder().getTopic(dispatchProfile.getUid());
            if (topic == null) {
                this.sinkContext.addSendResultMetric(dispatchProfile, dispatchProfile.getUid(), false, 0L);
                return false;
            }
            if (!this.topicSet.contains(topic)) {
                this.producer.publish(topic);
                this.topicSet.add(topic);
            }
            if (this.producer == null) {
                this.sinkContext.processSendFail(dispatchProfile, topic, 0L);
                return false;
            }
            Map<String, String> encodeCacheMessageHeaders = encodeCacheMessageHeaders(dispatchProfile);
            Message message = new Message(topic, EventUtils.encodeCacheMessageBody(this.sinkContext.getCompressType(), dispatchProfile.getEvents()));
            encodeCacheMessageHeaders.forEach((str, str2) -> {
                message.setAttrKeyVal(str, str2);
            });
            final long currentTimeMillis = System.currentTimeMillis();
            this.producer.sendMessage(message, new MessageSentCallback() { // from class: org.apache.inlong.dataproxy.sink.tubezone.TubeClusterProducer.1
                public void onMessageSent(MessageSentResult messageSentResult) {
                    TubeClusterProducer.this.sinkContext.addSendResultMetric(dispatchProfile, topic, true, currentTimeMillis);
                    dispatchProfile.ack();
                }

                public void onException(Throwable th) {
                    TubeClusterProducer.LOG.error("Send fail:{}", th.getMessage());
                    TubeClusterProducer.LOG.error(th.getMessage(), th);
                    TubeClusterProducer.this.sinkContext.processSendFail(dispatchProfile, topic, currentTimeMillis);
                }
            });
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            this.sinkContext.processSendFail(dispatchProfile, dispatchProfile.getUid(), 0L);
            return false;
        }
    }

    public Map<String, String> encodeCacheMessageHeaders(DispatchProfile dispatchProfile) {
        HashMap hashMap = new HashMap();
        hashMap.put(ConfigConstants.VERSION_TYPE, "1");
        hashMap.put("inlongGroupId", dispatchProfile.getInlongGroupId());
        hashMap.put("inlongStreamId", dispatchProfile.getInlongStreamId());
        hashMap.put("proxyName", this.sinkContext.getNodeId());
        hashMap.put("packTime", String.valueOf(System.currentTimeMillis()));
        hashMap.put("msgCount", String.valueOf(dispatchProfile.getEvents().size()));
        hashMap.put("srcLength", String.valueOf(dispatchProfile.getSize()));
        hashMap.put("compressType", String.valueOf(this.sinkContext.getCompressType().getNumber()));
        return hashMap;
    }

    public String getCacheClusterName() {
        return this.cacheClusterName;
    }
}
