package org.apache.inlong.dataproxy.http;

import com.google.common.base.Splitter;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.inlong.commons.monitor.CounterGroup;
import org.apache.inlong.commons.monitor.CounterGroupExt;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.commons.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/http/SimpleMessageHandler.class */
public class SimpleMessageHandler implements MessageHandler {
    private final CounterGroup counterGroup;
    private final CounterGroupExt counterGroupExt;
    private static final String SEPARATOR = "#";
    private int maxMsgLength;
    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
    private final ChannelProcessor processor;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() { // from class: org.apache.inlong.dataproxy.http.SimpleMessageHandler.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyyMMddHHmm");
        }
    };
    private final boolean isNewMetricOn = true;
    private final MonitorIndex monitorIndex = new MonitorIndex("Source", 60, 300000);
    private final MonitorIndexExt monitorIndexExt = new MonitorIndexExt("DataProxy_monitors#http", 60, 100000);
    private long logCounter = 0;
    private long channelTrace = 0;

    public SimpleMessageHandler(ChannelProcessor channelProcessor, CounterGroup counterGroup, CounterGroupExt counterGroupExt, ServiceDecoder serviceDecoder) {
        this.processor = channelProcessor;
        this.counterGroup = counterGroup;
        this.counterGroupExt = counterGroupExt;
        init();
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void init() {
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void destroy() {
    }

    @Override // org.apache.inlong.dataproxy.http.MessageHandler
    public void processMessage(Context context) throws MessageProcessException {
        String str = "test";
        StringBuffer stringBuffer = new StringBuffer("m=0");
        String str2 = (String) context.get(AttributeConstants.GROUP_ID);
        String str3 = (String) context.get(AttributeConstants.INTERFACE_ID);
        String str4 = (String) context.get(AttributeConstants.DATA_TIME);
        String topic = getTopic(str2, str3);
        if (null != topic && !"".equals(topic)) {
            str = topic.trim();
        }
        String str5 = configManager.getMxProperties().get(str2);
        if (null != str5) {
            stringBuffer = new StringBuffer(str5.trim());
        }
        stringBuffer.append("&groupId=").append(str2).append("&streamId=").append(str3).append("&dt=").append(str4);
        HttpServletRequest httpServletRequest = (HttpServletRequest) context.get(HttpSourceConstants.HTTP_REQUEST);
        String remoteAddr = httpServletRequest.getRemoteAddr();
        stringBuffer.append("&NodeIP=").append(remoteAddr);
        String parameter = httpServletRequest.getParameter(AttributeConstants.MESSAGE_COUNT);
        if (parameter == null || "".equals(parameter)) {
            parameter = "1";
        }
        InLongMsg newInLongMsg = InLongMsg.newInLongMsg(true);
        String str6 = (String) context.get(HttpSourceConstants.CHARSET);
        if (str6 == null || "".equals(str6)) {
            str6 = HttpSourceConstants.CHARSET;
        }
        try {
            newInLongMsg.addMsg(stringBuffer.toString(), ((String) context.get("body")).getBytes(str6));
            HashMap hashMap = new HashMap();
            hashMap.put(AttributeConstants.DATA_TIME, str4);
            hashMap.put("topic", str);
            hashMap.put(AttributeConstants.INTERFACE_ID, str3);
            hashMap.put(ConfigConstants.REMOTE_IP_KEY, remoteAddr);
            hashMap.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
            hashMap.put("msgcnt", parameter);
            byte[] buildArray = newInLongMsg.buildArray();
            hashMap.put(ConfigConstants.TOTAL_LEN, String.valueOf(buildArray.length));
            String format = dateFormator.get().format(Long.valueOf(newInLongMsg.getCreatetime()));
            hashMap.put("msg.pkg.time", format);
            Event withBody = EventBuilder.withBody(buildArray, hashMap);
            this.counterGroupExt.addAndGet(str + SEPARATOR + 0 + SEPARATOR + remoteAddr + "#time#" + format, Long.valueOf(parameter));
            try {
                long parseLong = (((Long.parseLong(str4) / 1000) / 60) / 10) * 1000 * 60 * 10;
                StringBuilder sb = new StringBuilder();
                sb.append("http").append(SEPARATOR).append(str).append(SEPARATOR).append(str3).append(SEPARATOR).append(remoteAddr).append(SEPARATOR).append(NetworkUtils.getLocalIp()).append(SEPARATOR).append(new SimpleDateFormat("yyyyMMddHHmm").format(Long.valueOf(parseLong))).append(SEPARATOR).append(format);
                this.monitorIndex.addAndGet(new String(sb), Integer.parseInt(parameter), 1, buildArray.length, 0);
                newInLongMsg.reset();
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    this.processor.processEvent(withBody);
                    this.counterGroup.incrementAndGet(StatConstants.EVENT_SUCCESS);
                    this.monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                    this.channelTrace++;
                    if (this.channelTrace % 600000 == 0) {
                        LOG.info("processor.processEvent spend time={} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    }
                    if (this.channelTrace > 9223372036854775797L) {
                        this.channelTrace = 0L;
                        LOG.info("channelTrace will reverse.");
                    }
                } catch (ChannelException e) {
                    this.counterGroup.incrementAndGet(StatConstants.EVENT_DROPPED);
                    this.monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                    this.monitorIndex.addAndGet(new String(sb), 0, 0, 0L, Integer.parseInt(parameter));
                    this.logCounter++;
                    if (this.logCounter == 1 || this.logCounter % 1000 == 0) {
                        LOG.error("Error writting to channel,and will retry after 1s,ex={},logCounter = {}, spend time={} ms", new Object[]{e.toString(), Long.valueOf(this.logCounter), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                        if (this.logCounter > 9223372036854775797L) {
                            this.logCounter = 0L;
                            LOG.info("logCounter will reverse.");
                        }
                    }
                    throw e;
                }
            } catch (NumberFormatException e2) {
                throw new MessageProcessException(new Throwable("attribute dt=" + str4 + " has error, detail is: " + ((Object) stringBuffer)));
            }
        } catch (UnsupportedEncodingException e3) {
            throw new MessageProcessException(e3);
        }
    }

    public void configure(org.apache.flume.Context context) {
    }

    private Map<String, String> getAttributeMap(String str) {
        if (null == str || "".equals(str)) {
            return null;
        }
        try {
            return Splitter.on(AttributeConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR).split(str);
        } catch (Exception e) {
            LOG.error("fail to fillTopicKeyandAttr,attr:{}", str);
            LOG.error("error!", e);
            return null;
        }
    }

    private Map<String, String> loadProperties(String str) {
        InputStream openStream;
        HashMap hashMap = new HashMap();
        if (null == str) {
            LOG.error("fail to loadTopics, null == fileName.");
            return hashMap;
        }
        InputStream inputStream = null;
        try {
            try {
                URL resource = getClass().getClassLoader().getResource(str);
                openStream = resource != null ? resource.openStream() : null;
            } catch (UnsupportedEncodingException e) {
                LOG.error("fail to loadPropery, file ={}, and e= {}", str, e);
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (IOException e2) {
                        LOG.error("fail to loadTopics, inStream.close ,and e= {}", str, e2);
                    }
                }
            } catch (Exception e3) {
                LOG.error("fail to loadProperty, file ={}, and e= {}", str, e3);
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (IOException e4) {
                        LOG.error("fail to loadTopics, inStream.close ,and e= {}", str, e4);
                    }
                }
            }
            if (openStream == null) {
                LOG.error("InputStream {} is null!", str);
                if (null != openStream) {
                    try {
                        openStream.close();
                    } catch (IOException e5) {
                        LOG.error("fail to loadTopics, inStream.close ,and e= {}", str, e5);
                    }
                }
                return hashMap;
            }
            Properties properties = new Properties();
            properties.load(openStream);
            for (Map.Entry entry : properties.entrySet()) {
                hashMap.put((String) entry.getKey(), (String) entry.getValue());
            }
            if (null != openStream) {
                try {
                    openStream.close();
                } catch (IOException e6) {
                    LOG.error("fail to loadTopics, inStream.close ,and e= {}", str, e6);
                }
            }
            return hashMap;
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    inputStream.close();
                } catch (IOException e7) {
                    LOG.error("fail to loadTopics, inStream.close ,and e= {}", str, e7);
                }
            }
            throw th;
        }
    }

    private String getTopic(String str, String str2) {
        String str3 = null;
        if (StringUtils.isNotEmpty(str)) {
            if (StringUtils.isNotEmpty(str2)) {
                str3 = configManager.getTopicProperties().get(str + "/" + str2);
            }
            if (StringUtils.isEmpty(str3)) {
                str3 = configManager.getTopicProperties().get(str);
            }
        }
        LOG.debug("Get topic by groupId/streamId = {}, topic = {}", str + "/" + str2, str3);
        return str3;
    }
}
