package org.apache.inlong.dataproxy.sink.common;

import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
import org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/common/SinkContext.class */
public class SinkContext {
    public static final String KEY_MAX_THREADS = "maxThreads";
    public static final String KEY_PROCESSINTERVAL = "processInterval";
    public static final String KEY_RELOADINTERVAL = "reloadInterval";
    public static final String KEY_MESSAGE_QUEUE_HANDLER = "messageQueueHandler";
    protected static final Logger logger = LoggerFactory.getLogger(SinkContext.class);
    protected final String sinkName;
    protected final Context sinkContext;
    protected final Channel channel;
    protected final int maxThreads;
    protected final long processInterval;
    protected final long reloadInterval;
    protected final DataProxyMetricItemSet metricItemSet;
    protected MonitorIndex monitorIndex = null;
    private MonitorStats monitorStats = null;
    protected final String clusterId = CommonConfigHolder.getInstance().getClusterName();
    private final boolean enableFileMetric = CommonConfigHolder.getInstance().isEnableFileMetric();

    public SinkContext(String str, Context context, Channel channel) {
        this.sinkName = str;
        this.sinkContext = context;
        this.channel = channel;
        this.maxThreads = this.sinkContext.getInteger(KEY_MAX_THREADS, 10).intValue();
        this.processInterval = this.sinkContext.getInteger(KEY_PROCESSINTERVAL, 100).intValue();
        this.reloadInterval = this.sinkContext.getLong("reloadInterval", 60000L).longValue();
        this.metricItemSet = new DataProxyMetricItemSet(str);
        MetricRegister.register(this.metricItemSet);
    }

    public void start() {
        if (this.enableFileMetric) {
            this.monitorIndex = new MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(), CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000, CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
            this.monitorStats = new MonitorStats(CommonConfigHolder.getInstance().getFileMetricEventOutName() + AttrConstants.SEP_HASHTAG + getSinkName(), CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000, CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
            this.monitorIndex.start();
            this.monitorStats.start();
        }
    }

    public void close() {
        if (this.enableFileMetric) {
            if (this.monitorIndex != null) {
                this.monitorIndex.stop();
            }
            if (this.monitorStats != null) {
                this.monitorStats.stop();
            }
        }
    }

    public void fileMetricIncSumStats(String str) {
        if (this.enableFileMetric) {
            this.monitorStats.incSumStats(str);
        }
    }

    public void fileMetricIncWithDetailStats(String str, String str2) {
        if (this.enableFileMetric) {
            this.monitorStats.incSumStats(str);
            this.monitorStats.incDetailStats(str + AttrConstants.SEP_HASHTAG + str2);
        }
    }

    public void fileMetricAddSuccStats(PackProfile packProfile, String str, String str2) {
        if (this.enableFileMetric && (packProfile instanceof SimplePackProfile)) {
            fileMetricIncStats((SimplePackProfile) packProfile, true, str, str2, StatConstants.EVENT_SINK_SUCCESS, "");
        }
    }

    public void fileMetricAddFailStats(PackProfile packProfile, String str, String str2, String str3) {
        if (this.enableFileMetric && (packProfile instanceof SimplePackProfile)) {
            fileMetricIncStats((SimplePackProfile) packProfile, false, str, str2, StatConstants.EVENT_SINK_FAILURE, str3);
        }
    }

    public void fileMetricAddExceptStats(PackProfile packProfile, String str, String str2, String str3) {
        if (this.enableFileMetric && (packProfile instanceof SimplePackProfile)) {
            fileMetricIncStats((SimplePackProfile) packProfile, false, str, str2, StatConstants.EVENT_SINK_RECEIVEEXCEPT, str3);
        }
    }

    private void fileMetricIncStats(SimplePackProfile simplePackProfile, boolean z, String str, String str2, String str3, String str4) {
        StringBuilder append = new StringBuilder(512).append(this.sinkName).append(AttrConstants.SEP_HASHTAG).append(simplePackProfile.getInlongGroupId()).append(AttrConstants.SEP_HASHTAG).append(simplePackProfile.getInlongStreamId()).append(AttrConstants.SEP_HASHTAG).append(str).append(AttrConstants.SEP_HASHTAG).append(simplePackProfile.getProperties().get("dpIp")).append(AttrConstants.SEP_HASHTAG).append(str2).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmmTenMins(Long.parseLong(simplePackProfile.getProperties().get(HttpAttrConst.KEY_DATA_TIME)))).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(Long.parseLong(simplePackProfile.getProperties().get("msg.pkg.time"))));
        if (z) {
            this.monitorIndex.addSuccStats(append.toString(), NumberUtils.toInt(simplePackProfile.getProperties().get("msgcnt"), 1), 1, simplePackProfile.getSize());
            this.monitorStats.incSumStats(str3);
        } else {
            this.monitorIndex.addFailStats(append.toString(), 1);
            this.monitorStats.incSumStats(str3);
            this.monitorStats.incDetailStats(str3 + AttrConstants.SEP_HASHTAG + str4);
        }
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public String getSinkName() {
        return this.sinkName;
    }

    public Context getSinkContext() {
        return this.sinkContext;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public int getMaxThreads() {
        return this.maxThreads;
    }

    public long getProcessInterval() {
        return this.processInterval;
    }

    public long getReloadInterval() {
        return this.reloadInterval;
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public EventHandler createEventHandler() {
        String eventHandler = CommonConfigHolder.getInstance().getEventHandler();
        try {
            Object newInstance = ClassUtils.getClass(eventHandler).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof EventHandler) {
                return (EventHandler) newInstance;
            }
            return null;
        } catch (Throwable th) {
            logger.error("{} fail to init EventHandler,handlerClass:{},error:{}", new Object[]{this.sinkName, eventHandler, th.getMessage(), th});
            return null;
        }
    }

    public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig cacheClusterConfig) {
        String orDefault = cacheClusterConfig.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER, PulsarHandler.class.getName());
        logger.info("{}'s mq handler class = {}", this.sinkName, orDefault);
        try {
            Object newInstance = ClassUtils.getClass(orDefault).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof MessageQueueHandler) {
                return (MessageQueueHandler) newInstance;
            }
            return null;
        } catch (Throwable th) {
            logger.error("{} fail to init MessageQueueHandler,handlerClass:{},error:{}", new Object[]{this.sinkName, orDefault, th.getMessage(), th});
            return null;
        }
    }
}
