package org.apache.inlong.dataproxy.source;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
import org.apache.inlong.commons.msg.InLongMsg;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/ServerMessageHandler.class */
public class ServerMessageHandler extends SimpleChannelHandler {
    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
    private AbstractSource source;
    private final ChannelGroup allChannels;
    private int maxConnections;
    private boolean filterEmptyMsg;
    private final boolean isCompressed;
    private final ChannelProcessor processor;
    private final ServiceDecoder serviceDecoder;
    private final String defaultTopic;
    private String defaultMXAttr;
    private final ChannelBuffer heartbeatBuffer;
    private final String protocolType;
    private MonitorIndex monitorIndex;
    private MonitorIndexExt monitorIndexExt;
    private final DataProxyMetricItemSet metricItemSet;
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttributeConstants.SEPARATOR).withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
    private static final Splitter.MapSplitter mapSplitter = Splitter.on(AttributeConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
    private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() { // from class: org.apache.inlong.dataproxy.source.ServerMessageHandler.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyyMMddHHmm");
        }
    };
    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() { // from class: org.apache.inlong.dataproxy.source.ServerMessageHandler.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyyMMddHHmmss");
        }
    };

    public ServerMessageHandler(AbstractSource abstractSource, ServiceDecoder serviceDecoder, ChannelGroup channelGroup, String str, String str2, Boolean bool, Integer num, Boolean bool2, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt, String str3) {
        this.maxConnections = Integer.MAX_VALUE;
        this.filterEmptyMsg = false;
        this.defaultMXAttr = "m=3";
        this.source = abstractSource;
        this.processor = abstractSource.getChannelProcessor();
        this.serviceDecoder = serviceDecoder;
        this.allChannels = channelGroup;
        this.defaultTopic = str;
        if (null != str2) {
            this.defaultMXAttr = str2;
        }
        this.filterEmptyMsg = bool.booleanValue();
        this.isCompressed = bool2.booleanValue();
        this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
        this.maxConnections = num.intValue();
        this.protocolType = str3;
        if (abstractSource instanceof SimpleTcpSource) {
            this.metricItemSet = ((SimpleTcpSource) abstractSource).getMetricItemSet();
        } else {
            this.metricItemSet = new DataProxyMetricItemSet(toString());
        }
        this.monitorIndex = monitorIndex;
        this.monitorIndexExt = monitorIndexExt;
    }

    private String getRemoteIp(Channel channel) {
        return getRemoteIp(channel, null);
    }

    private String getRemoteIp(Channel channel, SocketAddress socketAddress) {
        String str = DEFAULT_REMOTE_IP_VALUE;
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (remoteAddress == null) {
            remoteAddress = socketAddress;
        }
        if (null != remoteAddress) {
            str = remoteAddress.toString();
            try {
                str = str.substring(1, str.indexOf(58));
            } catch (Exception e) {
                logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}", str, remoteAddress);
            }
        }
        return str;
    }

    private byte[] newBinMsg(byte[] bArr, String str) {
        int length;
        String str2;
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        int i = wrap.getInt(0);
        int i2 = wrap.getInt(21);
        short s = wrap.getShort(25 + i2);
        if (s != 0) {
            length = i + str.length() + AttributeConstants.SEPARATOR.length();
            str2 = AttributeConstants.SEPARATOR + str;
        } else {
            length = i + str.length();
            str2 = str;
        }
        ByteBuffer allocate = ByteBuffer.allocate(length + 4);
        allocate.put(wrap.array(), 0, i2 + 27 + s);
        allocate.putShort(i2 + 25, (short) (str2.length() + s));
        System.arraycopy(str2.getBytes(StandardCharsets.UTF_8), 0, allocate.array(), i2 + 27 + s, str2.length());
        allocate.putShort(9, (short) (wrap.getShort(9) | 4));
        allocate.putInt(0, length);
        allocate.putShort((length + 4) - 2, (short) -4607);
        return allocate.array();
    }

    public boolean checkBlackIp(Channel channel) {
        String remoteIp = getRemoteIp(channel);
        if (remoteIp == null || SimpleTcpSource.blacklist == null || !SimpleTcpSource.blacklist.contains(remoteIp)) {
            return false;
        }
        logger.error(remoteIp + " is in blacklist, so refuse it !");
        channel.disconnect();
        channel.close();
        this.allChannels.remove(channel);
        return true;
    }

    public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (this.allChannels.size() - 1 >= this.maxConnections) {
            logger.warn("refuse to connect , and connections=" + (this.allChannels.size() - 1) + ", maxConnections=" + this.maxConnections + ",channel is " + channelStateEvent.getChannel());
            channelStateEvent.getChannel().disconnect();
            channelStateEvent.getChannel().close();
        }
        if (checkBlackIp(channelStateEvent.getChannel())) {
            return;
        }
        logger.info("connections={},maxConnections={}", Integer.valueOf(this.allChannels.size() - 1), Integer.valueOf(this.maxConnections));
        this.allChannels.add(channelStateEvent.getChannel());
        super.channelOpen(channelHandlerContext, channelStateEvent);
    }

    private void checkGroupIdInfo(ProxyMessage proxyMessage, Map<String, String> map, Map<String, String> map2, AtomicReference<String> atomicReference) {
        String groupId = proxyMessage.getGroupId();
        String streamId = proxyMessage.getStreamId();
        if (null != groupId) {
            if ("dc".equals(map.get(AttributeConstants.FROM))) {
                String streamId2 = proxyMessage.getStreamId();
                if (StringUtils.isNotEmpty(streamId2) && configManager.getDcMappingProperties().containsKey(streamId2.trim())) {
                    groupId = configManager.getDcMappingProperties().get(streamId2.trim()).trim();
                    proxyMessage.setGroupId(groupId);
                }
            }
            String topic = getTopic(groupId, streamId);
            if (StringUtils.isNotEmpty(topic)) {
                atomicReference.set(topic.trim());
            }
            Map<String, String> map3 = configManager.getMxPropertiesMaps().get(groupId);
            if (map3 == null || map3.size() == 0) {
                proxyMessage.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
                return;
            } else {
                proxyMessage.getAttributeMap().putAll(map3);
                return;
            }
        }
        String str = map.get(AttributeConstants.NUM2NAME);
        String str2 = map.get(AttributeConstants.GROUPID_NUM);
        String str3 = map.get(AttributeConstants.STREAMID_NUM);
        if (configManager.getGroupIdMappingProperties() == null || configManager.getStreamIdMappingProperties() == null) {
            return;
        }
        String str4 = configManager.getGroupIdMappingProperties().get(str2);
        String str5 = configManager.getStreamIdMappingProperties().get(str2) == null ? null : configManager.getStreamIdMappingProperties().get(str2).get(str3);
        if (str4 == null || str5 == null) {
            return;
        }
        if ("TRUE".equalsIgnoreCase(configManager.getGroupIdEnableMappingProperties() == null ? null : configManager.getGroupIdEnableMappingProperties().get(str2)) && "TRUE".equalsIgnoreCase(str)) {
            proxyMessage.setData(newBinMsg(proxyMessage.getData(), "groupId=" + str4 + AttributeConstants.SEPARATOR + "streamId=" + str5));
        }
        map2.put(AttributeConstants.GROUP_ID, str4);
        map2.put(AttributeConstants.INTERFACE_ID, str5);
        proxyMessage.setGroupId(str4);
        proxyMessage.setStreamId(str5);
        String topic2 = getTopic(str4, str5);
        if (StringUtils.isNotEmpty(topic2)) {
            atomicReference.set(topic2.trim());
        }
    }

    private void updateMsgList(List<ProxyMessage> list, Map<String, String> map, Map<String, HashMap<String, List<ProxyMessage>>> map2, String str, MsgType msgType) {
        for (ProxyMessage proxyMessage : list) {
            Map<String, String> attributeMap = proxyMessage.getAttributeMap();
            AtomicReference<String> atomicReference = new AtomicReference<>(this.defaultTopic);
            checkGroupIdInfo(proxyMessage, map, attributeMap, atomicReference);
            String str2 = atomicReference.get();
            proxyMessage.setTopic(str2);
            map.put(AttributeConstants.NODE_IP, str);
            String groupId = proxyMessage.getGroupId();
            String streamId = proxyMessage.getStreamId();
            if (ConfigConstants.SLA_METRIC_GROUPID.equals(groupId)) {
                map.put(ConfigConstants.SLA_METRIC_DATA, "true");
                proxyMessage.setTopic(ConfigConstants.SLA_METRIC_DATA);
            }
            if (groupId != null && streamId != null) {
                String str3 = groupId + AttributeConstants.SEPARATOR + streamId;
                if (configManager.getTubeSwitchProperties().get(str3) != null && "false".equals(configManager.getTubeSwitchProperties().get(str3).trim())) {
                }
            }
            if (!"pb".equals(attributeMap.get(AttributeConstants.MESSAGE_TYPE)) && !MsgType.MSG_MULTI_BODY.equals(msgType) && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                byte[] data = proxyMessage.getData();
                if (data[data.length - 1] == 10) {
                    int length = data.length - 1;
                    if (data[data.length - 2] == 13) {
                        length = data.length - 2;
                    }
                    byte[] bArr = new byte[length];
                    System.arraycopy(data, 0, bArr, 0, length);
                    proxyMessage.setData(bArr);
                }
            }
            if (streamId == null) {
                streamId = "";
            }
            map2.computeIfAbsent(str2, str4 -> {
                return new HashMap();
            }).computeIfAbsent(streamId, str5 -> {
                return new ArrayList();
            }).add(proxyMessage);
        }
    }

    private void formatMessagesAndSend(Map<String, String> map, Map<String, HashMap<String, List<ProxyMessage>>> map2, String str, MsgType msgType) throws MessageIDException {
        int i = 1;
        if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
            i = 3;
        } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
            i = 4;
        }
        for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> entry : map2.entrySet()) {
            for (Map.Entry<String, List<ProxyMessage>> entry2 : entry.getValue().entrySet()) {
                InLongMsg newInLongMsg = InLongMsg.newInLongMsg(this.isCompressed, i);
                HashMap hashMap = new HashMap();
                for (ProxyMessage proxyMessage : entry2.getValue()) {
                    if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                        proxyMessage.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage.getAttributeMap()), proxyMessage.getData());
                    } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                        newInLongMsg.addMsg(proxyMessage.getData());
                    } else {
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage.getAttributeMap()), proxyMessage.getData());
                    }
                }
                String format = dateFormator.get().format(Long.valueOf(newInLongMsg.getCreatetime()));
                if (i == 4) {
                    format = map.containsKey("msg.pkg.time") ? map.get("msg.pkg.time") : dateFormator.get().format(Long.valueOf(System.currentTimeMillis()));
                }
                if (map.get(AttributeConstants.DATA_TIME) != null) {
                    hashMap.put(AttributeConstants.DATA_TIME, map.get(AttributeConstants.DATA_TIME));
                } else {
                    hashMap.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
                }
                hashMap.put("topic", entry.getKey());
                hashMap.put(AttributeConstants.GROUP_ID, entry2.getValue().get(0).getGroupId());
                hashMap.put(AttributeConstants.INTERFACE_ID, entry2.getKey());
                hashMap.put(ConfigConstants.REMOTE_IP_KEY, str);
                hashMap.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                String str2 = map.get(AttributeConstants.MESSAGE_COUNT);
                hashMap.put("msgcnt", str2);
                byte[] buildArray = newInLongMsg.buildArray();
                hashMap.put(ConfigConstants.TOTAL_LEN, String.valueOf(buildArray.length));
                String str3 = map.get(AttributeConstants.SEQUENCE_ID);
                if (StringUtils.isNotEmpty(str3)) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(entry.getKey()).append(AttributeConstants.SEPARATOR).append(entry2.getKey()).append(AttributeConstants.SEPARATOR).append(str3);
                    hashMap.put(ConfigConstants.SEQUENCE_ID, sb.toString());
                }
                hashMap.put("msg.pkg.time", format);
                Event withBody = EventBuilder.withBody(buildArray, hashMap);
                try {
                    long parseLong = (((Long.parseLong((String) hashMap.get(AttributeConstants.DATA_TIME)) / 1000) / 60) / 10) * 1000 * 60 * 10;
                    StringBuilder sb2 = new StringBuilder();
                    sb2.append(this.protocolType).append(AttributeConstants.SEPARATOR).append(entry.getKey()).append(AttributeConstants.SEPARATOR).append(entry2.getKey()).append(AttributeConstants.SEPARATOR).append(str).append(AttributeConstants.SEPARATOR).append(NetworkUtils.getLocalIp()).append(AttributeConstants.SEPARATOR).append(new SimpleDateFormat("yyyyMMddHHmm").format(Long.valueOf(parseLong))).append(AttributeConstants.SEPARATOR).append(format);
                    try {
                        this.processor.processEvent(withBody);
                        this.monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                        addMetric(true, buildArray.length, withBody);
                        this.monitorIndex.addAndGet(new String(sb2), Integer.parseInt(str2), 1, buildArray.length, 0);
                    } catch (Throwable th) {
                        logger.error("Error writting to channel,data will discard.", th);
                        this.monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                        this.monitorIndex.addAndGet(new String(sb2), 0, 0, 0L, Integer.parseInt(str2));
                        addMetric(false, buildArray.length, withBody);
                        throw new ChannelException("ProcessEvent error can't write event to channel.");
                    }
                } catch (Exception e) {
                    throw new MessageIDException(Long.parseLong(map.get(AttributeConstants.UNIQ_ID)), ErrorCode.DT_ERROR, new Throwable("attribute dt=" + ((String) hashMap.get("dt has error, detail is: topic=" + entry.getKey() + "&streamId=" + entry2.getKey() + "&NodeIP=" + str)), e));
                }
            }
        }
    }

    private void responsePackage(Map<String, String> map, Map<String, Object> map2, Channel channel, SocketAddress socketAddress, MsgType msgType) throws Exception {
        if (!map.containsKey("isAck") || "true".equals(map.get("isAck"))) {
            if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                byte[] bytes = mapJoiner.join(map).getBytes(StandardCharsets.UTF_8);
                if (bytes == null || new String(bytes, StandardCharsets.UTF_8).isEmpty()) {
                    return;
                }
                byte[] bArr = MsgType.MSG_ORIGINAL_RETURN.equals(msgType) ? (byte[]) map2.get("body") : new byte[]{50};
                int length = 5 + bArr.length + 4 + bytes.length;
                ChannelBuffer buffer = ChannelBuffers.buffer(4 + length);
                buffer.writeInt(length);
                buffer.writeByte(msgType.getValue());
                buffer.writeInt(bArr.length);
                buffer.writeBytes(bArr);
                buffer.writeInt(bytes.length);
                buffer.writeBytes(bytes);
                if (channel.isWritable()) {
                    channel.write(buffer, socketAddress);
                    return;
                } else {
                    String str = new String(bytes, StandardCharsets.UTF_8);
                    logger.warn("the send buffer1 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str);
                    throw new Exception(new Throwable("the send buffer1 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str));
                }
            }
            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                String str2 = null;
                if (map2.containsKey(ConfigConstants.DECODER_ATTRS)) {
                    str2 = (String) map2.get(ConfigConstants.DECODER_ATTRS);
                }
                int i = 9;
                if (null != str2) {
                    i = 9 + str2.length();
                }
                ChannelBuffer buffer2 = ChannelBuffers.buffer(4 + i);
                buffer2.writeInt(i);
                buffer2.writeByte(msgType.getValue());
                long parseLong = Long.parseLong(map.get(AttributeConstants.UNIQ_ID));
                buffer2.writeBytes(new byte[]{(byte) ((parseLong >> 24) & 255), (byte) ((parseLong >> 16) & 255), (byte) ((parseLong >> 8) & 255), (byte) (parseLong & 255)});
                if (null != str2) {
                    buffer2.writeShort(str2.length());
                    buffer2.writeBytes(str2.getBytes(StandardCharsets.UTF_8));
                } else {
                    buffer2.writeShort(0);
                }
                buffer2.writeShort(60929);
                if (channel.isWritable()) {
                    channel.write(buffer2, socketAddress);
                } else {
                    logger.warn("the send buffer2 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str2);
                    throw new Exception(new Throwable("the send buffer2 is full,so disconnect it!please check remote client, Connection info:" + channel + ";attr is " + str2));
                }
            }
        }
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        Event withBody;
        ChannelException channelException;
        logger.debug("message received");
        if (messageEvent == null) {
            logger.error("get null messageevent, just skip");
            addMetric(false, 0L, null);
            return;
        }
        ChannelBuffer channelBuffer = (ChannelBuffer) messageEvent.getMessage();
        String remoteIp = getRemoteIp(messageEvent.getChannel(), messageEvent.getRemoteAddress());
        SocketAddress remoteAddress = messageEvent.getRemoteAddress();
        if (channelBuffer.readableBytes() == 0 && this.filterEmptyMsg) {
            logger.warn("skip empty msg.");
            channelBuffer.clear();
            addMetric(false, 0L, null);
            return;
        }
        Channel channel = messageEvent.getChannel();
        try {
            Map<String, Object> extractData = this.serviceDecoder.extractData(channelBuffer, channel, messageEvent);
            if (extractData == null) {
                logger.info("result is null");
                addMetric(false, 0L, null);
                return;
            }
            MsgType msgType = (MsgType) extractData.get(ConfigConstants.MSG_TYPE);
            if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
                channel.write(this.heartbeatBuffer, remoteAddress);
                addMetric(false, 0L, null);
                return;
            }
            if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
                addMetric(false, 0L, null);
                return;
            }
            Map<String, String> map = (Map) extractData.get(ConfigConstants.COMMON_ATTR_MAP);
            if (map == null) {
                map = new HashMap();
            }
            List<ProxyMessage> list = (List) extractData.get(ConfigConstants.MSG_LIST);
            if (list != null && !map.containsKey(ConfigConstants.FILE_CHECK_DATA) && !map.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                HashMap hashMap = new HashMap(list.size());
                updateMsgList(list, map, hashMap, remoteIp, msgType);
                formatMessagesAndSend(map, hashMap, remoteIp, msgType);
            } else if (list != null && map.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put("msgtype", "filestatus");
                hashMap2.put(ConfigConstants.FILE_CHECK_DATA, "true");
                Iterator<ProxyMessage> it = list.iterator();
                while (it.hasNext()) {
                    withBody = EventBuilder.withBody(it.next().getData(), hashMap2);
                    try {
                        this.processor.processEvent(withBody);
                        addMetric(true, r0.length, withBody);
                    } finally {
                    }
                }
            } else if (list != null && map.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                logger.info("i am in MINUTE_CHECK_DATA");
                HashMap hashMap3 = new HashMap();
                hashMap3.put("msgtype", "measure");
                hashMap3.put(ConfigConstants.FILE_CHECK_DATA, "true");
                Iterator<ProxyMessage> it2 = list.iterator();
                while (it2.hasNext()) {
                    withBody = EventBuilder.withBody(it2.next().getData(), hashMap3);
                    try {
                        this.processor.processEvent(withBody);
                        addMetric(true, r0.length, withBody);
                    } finally {
                    }
                }
            }
            responsePackage(map, extractData, channel, remoteAddress, msgType);
        } catch (MessageIDException e) {
            logger.error("MessageIDException ex = {}", e);
            addMetric(false, 0L, null);
            throw new IOException(e.getCause());
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        logger.error("exception caught", exceptionEvent.getCause());
        this.monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
    }

    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        logger.error("channel closed {}", channelHandlerContext.getChannel());
        super.channelClosed(channelHandlerContext, channelStateEvent);
        try {
            channelStateEvent.getChannel().disconnect();
            channelStateEvent.getChannel().close();
        } catch (Exception e) {
        }
        this.allChannels.remove(channelStateEvent.getChannel());
    }

    private String getTopic(String str) {
        return getTopic(str, null);
    }

    private String getTopic(String str, String str2) {
        String str3 = null;
        if (StringUtils.isNotEmpty(str)) {
            if (StringUtils.isNotEmpty(str2)) {
                str3 = configManager.getTopicProperties().get(str + "/" + str2);
            }
            if (StringUtils.isEmpty(str3)) {
                str3 = configManager.getTopicProperties().get(str);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Get topic by groupId = {} , streamId = {}", str, str2);
        }
        return str3;
    }

    private void addMetric(boolean z, long j, Event event) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", "DataProxy");
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, this.source.getName());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.source.getName());
        DataProxyMetricItem.fillInlongId(event, hashMap);
        DataProxyMetricItem.fillAuditFormatTime(event, hashMap);
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.metricItemSet.findMetricItem(hashMap);
        if (!z) {
            dataProxyMetricItem.readFailCount.incrementAndGet();
            dataProxyMetricItem.readFailSize.addAndGet(j);
        } else {
            dataProxyMetricItem.readSuccessCount.incrementAndGet();
            dataProxyMetricItem.readSuccessSize.addAndGet(j);
            AuditUtils.add(5, event);
        }
    }
}
