package org.apache.inlong.dataproxy.http;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/http/SimpleMessageHandler.class */
public class SimpleMessageHandler implements MessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
    private final MonitorIndex monitorIndex;
    private final MonitorIndexExt monitorIndexExt;
    private final DataProxyMetricItemSet metricItemSet;
    private final ChannelProcessor processor;
    private int maxMsgLength;
    private long logCounter = 0;
    private long channelTrace = 0;

    public SimpleMessageHandler(ChannelProcessor channelProcessor, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt, DataProxyMetricItemSet dataProxyMetricItemSet, ServiceDecoder serviceDecoder) {
        this.processor = channelProcessor;
        this.monitorIndex = monitorIndex;
        this.monitorIndexExt = monitorIndexExt;
        this.metricItemSet = dataProxyMetricItemSet;
        init();
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void init() {
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void destroy() {
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void processMessage(Context context) throws MessageProcessException {
        StringBuilder sb = new StringBuilder(512);
        String str = (String) context.get("groupId");
        String str2 = (String) context.get("streamId");
        if (StringUtils.isBlank(str) || StringUtils.isBlank(str2)) {
            throw new MessageProcessException(sb.append("Field ").append("groupId").append(" or ").append("streamId").append(" must exist and not blank!").toString());
        }
        String trim = str.trim();
        String trim2 = str2.trim();
        String topic = getTopic(trim, trim2);
        String trim3 = StringUtils.isNotBlank(topic) ? topic.trim() : "test";
        long currentTimeMillis = System.currentTimeMillis();
        long j = NumberUtils.toLong((String) context.get("dt"), currentTimeMillis);
        String valueOf = String.valueOf(j);
        String str3 = (String) context.get(AttrConstants.CHARSET);
        if (StringUtils.isBlank(str3)) {
            str3 = AttrConstants.CHARSET;
        }
        String str4 = (String) context.get("body");
        if (StringUtils.isEmpty(str4)) {
            throw new MessageProcessException(sb.append("Field ").append("body").append(" must exist and not empty!").toString());
        }
        String str5 = configManager.getMxProperties().get(trim);
        String trim4 = StringUtils.isNotEmpty(str5) ? str5.trim() : "m=0";
        HttpServletRequest httpServletRequest = (HttpServletRequest) context.get(AttrConstants.HTTP_REQUEST);
        String remoteAddr = httpServletRequest.getRemoteAddr();
        int i = NumberUtils.toInt(httpServletRequest.getParameter("cnt"), 1);
        String valueOf2 = String.valueOf(i);
        InLongMsg newInLongMsg = InLongMsg.newInLongMsg(true);
        sb.append(trim4).append("&groupId=").append(trim).append("&streamId=").append(trim2).append("&dt=").append(valueOf).append("&NodeIP=").append(remoteAddr).append("&cnt=").append(valueOf2).append("&rt=").append(currentTimeMillis);
        try {
            newInLongMsg.addMsg(sb.toString(), str4.getBytes(str3));
            sb.delete(0, sb.length());
            HashMap hashMap = new HashMap();
            hashMap.put("groupId", trim);
            hashMap.put("streamId", trim2);
            hashMap.put("topic", trim3);
            hashMap.put("dt", valueOf);
            hashMap.put(ConfigConstants.REMOTE_IP_KEY, remoteAddr);
            hashMap.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
            hashMap.put("msgcnt", valueOf2);
            hashMap.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
            byte[] buildArray = newInLongMsg.buildArray();
            hashMap.put("rt", String.valueOf(currentTimeMillis));
            Event withBody = EventBuilder.withBody(buildArray, hashMap);
            newInLongMsg.reset();
            sb.append("http").append(AttrConstants.SEP_HASHTAG).append(trim3).append(AttrConstants.SEP_HASHTAG).append(trim2).append(AttrConstants.SEP_HASHTAG).append(remoteAddr).append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp()).append(AttrConstants.SEP_HASHTAG).append((String) MessageUtils.getEventProcType("", "").getRight()).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm((((j / 1000) / 60) / 10) * 1000 * 60 * 10)).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(currentTimeMillis));
            long currentTimeMillis2 = System.currentTimeMillis();
            try {
                this.processor.processEvent(withBody);
                if (this.monitorIndex != null) {
                    this.monitorIndex.addAndGet(sb.toString(), i, 1, buildArray.length, 0);
                    this.monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                }
                addStatistics(true, buildArray.length, withBody);
                this.channelTrace++;
                if (this.channelTrace % 600000 == 0) {
                    LOG.info("processor.processEvent spend time={} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                }
                if (this.channelTrace > 9223372036854775797L) {
                    this.channelTrace = 0L;
                    LOG.info("channelTrace will reverse");
                }
            } catch (ChannelException e) {
                if (this.monitorIndex != null) {
                    this.monitorIndex.addAndGet(sb.toString(), 0, 0, 0L, i);
                    this.monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                }
                addStatistics(false, buildArray.length, withBody);
                this.logCounter++;
                if (this.logCounter == 1 || this.logCounter % 1000 == 0) {
                    LOG.error("Error writing to channel, and will retry after 1s, ex={},logCounter={}, spend time={} ms", new Object[]{e, Long.valueOf(this.logCounter), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
                    if (this.logCounter > 9223372036854775797L) {
                        this.logCounter = 0L;
                        LOG.info("logCounter will reverse");
                    }
                }
                throw e;
            }
        } catch (UnsupportedEncodingException e2) {
            throw new MessageProcessException(e2);
        }
    }

    public void configure(org.apache.flume.Context context) {
    }

    private String getTopic(String str, String str2) {
        String str3 = null;
        if (StringUtils.isNotEmpty(str)) {
            if (StringUtils.isNotEmpty(str2)) {
                str3 = configManager.getTopicProperties().get(str + "/" + str2);
            }
            if (StringUtils.isEmpty(str3)) {
                str3 = configManager.getTopicProperties().get(str);
            }
        }
        LOG.debug("Get topic by groupId/streamId = {}, topic = {}", str + "/" + str2, str3);
        return str3;
    }

    private void addStatistics(boolean z, long j, Event event) {
        if (event == null) {
            return;
        }
        this.metricItemSet.fillSrcMetricItemsByEvent(event, z, j);
        if (z) {
            AuditUtils.add(5, event);
        }
    }
}
