package org.apache.inlong.dataproxy.source;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/ServerMessageHandler.class */
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
    private BaseSource source;
    private final ChannelGroup allChannels;
    private int maxConnections;
    private boolean filterEmptyMsg;
    private final boolean isCompressed;
    private final ChannelProcessor processor;
    private final ServiceDecoder serviceDecoder;
    private final String defaultTopic;
    private String defaultMXAttr;
    private final String protocolType;
    private MonitorIndex monitorIndex;
    private MonitorIndexExt monitorIndexExt;
    private final DataProxyMetricItemSet metricItemSet;
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttrConstants.SEPARATOR).withKeyValueSeparator(AttrConstants.KEY_VALUE_SEPARATOR);
    private static final Splitter.MapSplitter mapSplitter = Splitter.on(AttrConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttrConstants.KEY_VALUE_SEPARATOR);

    public ServerMessageHandler(BaseSource baseSource, ServiceDecoder serviceDecoder, ChannelGroup channelGroup, String str, String str2, Boolean bool, Integer num, Boolean bool2, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt, String str3) {
        this.maxConnections = Integer.MAX_VALUE;
        this.filterEmptyMsg = false;
        this.defaultMXAttr = "m=3";
        this.source = baseSource;
        this.processor = baseSource.getChannelProcessor();
        this.serviceDecoder = serviceDecoder;
        this.allChannels = channelGroup;
        this.defaultTopic = str;
        if (null != str2) {
            this.defaultMXAttr = str2;
        }
        this.filterEmptyMsg = bool.booleanValue();
        this.isCompressed = bool2.booleanValue();
        this.maxConnections = num.intValue();
        this.protocolType = str3;
        this.metricItemSet = baseSource.getMetricItemSet();
        this.monitorIndex = monitorIndex;
        this.monitorIndexExt = monitorIndexExt;
    }

    private byte[] newBinMsg(byte[] bArr, String str) {
        int length;
        String str2;
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        int i = wrap.getInt(0);
        int i2 = wrap.getInt(21);
        short s = wrap.getShort(25 + i2);
        if (s != 0) {
            length = i + str.length() + AttrConstants.SEPARATOR.length();
            str2 = AttrConstants.SEPARATOR + str;
        } else {
            length = i + str.length();
            str2 = str;
        }
        ByteBuffer allocate = ByteBuffer.allocate(length + 4);
        allocate.put(wrap.array(), 0, i2 + 27 + s);
        allocate.putShort(i2 + 25, (short) (str2.length() + s));
        System.arraycopy(str2.getBytes(StandardCharsets.UTF_8), 0, allocate.array(), i2 + 27 + s, str2.length());
        allocate.putShort(9, (short) (wrap.getShort(9) | 4));
        allocate.putInt(0, length);
        allocate.putShort((length + 4) - 2, (short) -4607);
        return allocate.array();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        String channelRemoteIP;
        if (this.allChannels.size() - 1 >= this.maxConnections) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.source.getName(), channelHandlerContext.channel(), Integer.valueOf(this.allChannels.size() - 1), Integer.valueOf(this.maxConnections)});
        } else if (ConfigManager.getInstance().needChkIllegalIP() && (channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel())) != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.error(channelRemoteIP + " is Illegal IP, so refuse it !");
        } else {
            this.allChannels.add(channelHandlerContext.channel());
            channelHandlerContext.fireChannelActive();
            logger.info("{} added new channel, current connections = {}, maxConnections = {}", new Object[]{this.source.getName(), Integer.valueOf(this.allChannels.size() - 1), Integer.valueOf(this.maxConnections)});
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        logger.error("{} channel inactive {}", this.source.getName(), channelHandlerContext.channel());
        channelHandlerContext.fireChannelInactive();
        this.allChannels.remove(channelHandlerContext.channel());
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj == null) {
            logger.debug("Get null msg, just skip!");
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            Channel channel = channelHandlerContext.channel();
            String channelRemoteIP = AddressUtils.getChannelRemoteIP(channel);
            if (byteBuf.readableBytes() == 0 && this.filterEmptyMsg) {
                logger.debug("Get empty msg from {}, just skip!", channelRemoteIP);
                byteBuf.release();
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            try {
                Map<String, Object> extractData = this.serviceDecoder.extractData(byteBuf, channelRemoteIP, currentTimeMillis, channel);
                if (extractData == null || extractData.isEmpty()) {
                    logger.debug("Parse message result is null, from {}", channelRemoteIP);
                    byteBuf.release();
                    return;
                }
                MsgType msgType = (MsgType) extractData.get(ConfigConstants.MSG_TYPE);
                Map<String, String> map = (Map) extractData.get(ConfigConstants.COMMON_ATTR_MAP);
                if (map == null) {
                    map = new HashMap();
                }
                String str = map.get("errCode");
                if (!StringUtils.isEmpty(str) && !DataProxyErrCode.SUCCESS.getErrCodeStr().equals(str)) {
                    MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                    byteBuf.release();
                    return;
                }
                if (MsgType.MSG_HEARTBEAT.equals(msgType) || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
                    MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                    byteBuf.release();
                    return;
                }
                if (map.containsKey(ConfigConstants.FILE_CHECK_DATA) || map.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                    map.put("errCode", DataProxyErrCode.UNSUPPORTED_EXTEND_FIELD_VALUE.getErrCodeStr());
                    MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                    byteBuf.release();
                    return;
                }
                List<ProxyMessage> list = (List) extractData.get(ConfigConstants.MSG_LIST);
                if (list == null) {
                    map.put("errCode", DataProxyErrCode.EMPTY_MSG.getErrCodeStr());
                    MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                    byteBuf.release();
                } else {
                    if (!ConfigManager.getInstance().isMqClusterReady()) {
                        map.put("errCode", DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCodeStr());
                        MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                        byteBuf.release();
                        return;
                    }
                    HashMap hashMap = new HashMap(list.size());
                    if (!convertMsgList(list, map, hashMap, channelRemoteIP)) {
                        MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                        byteBuf.release();
                    } else {
                        formatMessagesAndSend(channelHandlerContext, map, extractData, hashMap, channelRemoteIP, msgType, currentTimeMillis);
                        if (!MessageUtils.isSinkRspType(map)) {
                            MessageUtils.sourceReturnRspPackage(map, extractData, channel, msgType);
                        }
                    }
                }
            } catch (MessageIDException e) {
                logger.error("MessageIDException ex = {}", e);
                throw new IOException(e.getCause());
            }
        } finally {
            byteBuf.release();
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
        channelHandlerContext.close();
        logger.error("{} exception caught cause = {}", this.source.getName(), th);
    }

    private boolean convertMsgList(List<ProxyMessage> list, Map<String, String> map, Map<String, HashMap<String, List<ProxyMessage>>> map2, String str) {
        for (ProxyMessage proxyMessage : list) {
            String str2 = null;
            String groupId = proxyMessage.getGroupId();
            String streamId = proxyMessage.getStreamId();
            if (null == groupId) {
                String str3 = map.get(AttrConstants.NUM2NAME);
                String str4 = map.get(AttrConstants.GROUPID_NUM);
                String str5 = map.get(AttrConstants.STREAMID_NUM);
                if (configManager.getGroupIdMappingProperties() != null && configManager.getStreamIdMappingProperties() != null) {
                    groupId = configManager.getGroupIdMappingProperties().get(str4);
                    streamId = configManager.getStreamIdMappingProperties().get(str4) == null ? null : configManager.getStreamIdMappingProperties().get(str4).get(str5);
                    if (groupId != null && streamId != null) {
                        if ("TRUE".equalsIgnoreCase(configManager.getGroupIdEnableMappingProperties() == null ? null : configManager.getGroupIdEnableMappingProperties().get(str4)) && "TRUE".equalsIgnoreCase(str3)) {
                            proxyMessage.setData(newBinMsg(proxyMessage.getData(), "groupId=" + groupId + "&streamId=" + streamId));
                        }
                        proxyMessage.setGroupId(groupId);
                        proxyMessage.setStreamId(streamId);
                        str2 = configManager.getTopicName(groupId, streamId);
                    }
                }
            } else {
                Map<String, String> map3 = configManager.getMxPropertiesMaps().get(groupId);
                if (map3 == null || map3.size() == 0) {
                    proxyMessage.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
                } else {
                    proxyMessage.getAttributeMap().putAll(map3);
                }
                str2 = configManager.getTopicName(groupId, streamId);
            }
            if (StringUtils.isEmpty(str2)) {
                if (!CommonConfigHolder.getInstance().isNoTopicAccept()) {
                    map.put("errCode", DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr());
                    logger.error("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}", groupId, streamId);
                    return false;
                }
                str2 = this.defaultTopic;
            }
            if (streamId == null) {
                streamId = "";
                proxyMessage.setStreamId(streamId);
            }
            proxyMessage.setTopic(str2);
            map.put("NodeIP", str);
            map2.computeIfAbsent(str2, str6 -> {
                return new HashMap();
            }).computeIfAbsent(streamId, str7 -> {
                return new ArrayList();
            }).add(proxyMessage);
        }
        return true;
    }

    private void formatMessagesAndSend(ChannelHandlerContext channelHandlerContext, Map<String, String> map, Map<String, Object> map2, Map<String, HashMap<String, List<ProxyMessage>>> map3, String str, MsgType msgType, long j) throws MessageIDException {
        int i = 1;
        if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
            i = 3;
        } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
            i = 4;
        }
        StringBuilder sb = new StringBuilder(512);
        int parseInt = Integer.parseInt(map.get("cnt"));
        for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> entry : map3.entrySet()) {
            for (Map.Entry<String, List<ProxyMessage>> entry2 : entry.getValue().entrySet()) {
                String str2 = null;
                InLongMsg newInLongMsg = InLongMsg.newInLongMsg(this.isCompressed, i);
                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                    for (ProxyMessage proxyMessage : entry2.getValue()) {
                        if (StringUtils.isEmpty(str2)) {
                            str2 = proxyMessage.getGroupId();
                        }
                        proxyMessage.getAttributeMap().put("cnt", String.valueOf(1));
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage.getAttributeMap()), proxyMessage.getData());
                    }
                } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                    for (ProxyMessage proxyMessage2 : entry2.getValue()) {
                        if (StringUtils.isEmpty(str2)) {
                            str2 = proxyMessage2.getGroupId();
                        }
                        newInLongMsg.addMsg(proxyMessage2.getData());
                    }
                } else {
                    for (ProxyMessage proxyMessage3 : entry2.getValue()) {
                        if (StringUtils.isEmpty(str2)) {
                            str2 = proxyMessage3.getGroupId();
                        }
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage3.getAttributeMap()), proxyMessage3.getData());
                    }
                }
                map.put("cnt", String.valueOf(parseInt));
                HashMap hashMap = new HashMap();
                hashMap.put("groupId", str2);
                hashMap.put("streamId", entry2.getKey());
                hashMap.put("topic", entry.getKey());
                String str3 = map.get("dt");
                hashMap.put("dt", str3);
                hashMap.put(ConfigConstants.REMOTE_IP_KEY, str);
                hashMap.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                hashMap.put("msgcnt", map.get("cnt"));
                hashMap.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName());
                hashMap.put("rt", map.get("rt"));
                hashMap.put(ConfigConstants.DECODER_ATTRS, (String) map2.get(ConfigConstants.DECODER_ATTRS));
                hashMap.put("uniq", map.get("uniq"));
                if ("false".equals(map.get("isAck"))) {
                    hashMap.put("isAck", "false");
                }
                String str4 = map.get("syncSend");
                if (StringUtils.isNotEmpty(str4)) {
                    hashMap.put("syncSend", str4);
                }
                String str5 = map.get("proxySend");
                if (StringUtils.isNotEmpty(str5)) {
                    hashMap.put("proxySend", str5);
                }
                String str6 = map.get("partitionKey");
                if (StringUtils.isNotEmpty(str6)) {
                    hashMap.put("partitionKey", str6);
                }
                String str7 = map.get("sid");
                if (StringUtils.isNotEmpty(str7)) {
                    sb.append(entry.getKey()).append(AttrConstants.SEPARATOR).append(entry2.getKey()).append(AttrConstants.SEPARATOR).append(str7);
                    hashMap.put(ConfigConstants.SEQUENCE_ID, sb.toString());
                    sb.delete(0, sb.length());
                }
                Event withBody = EventBuilder.withBody(newInLongMsg.buildArray(), hashMap);
                withBody.getHeaders().putAll(hashMap);
                newInLongMsg.reset();
                Pair<Boolean, String> eventProcType = MessageUtils.getEventProcType(str4, str5);
                if (((Boolean) eventProcType.getLeft()).booleanValue()) {
                    withBody = new SinkRspEvent(withBody, msgType, channelHandlerContext);
                }
                sb.append(this.protocolType).append(AttrConstants.SEPARATOR).append(entry.getKey()).append(AttrConstants.SEPARATOR).append(entry2.getKey()).append(AttrConstants.SEPARATOR).append(str).append(AttrConstants.SEPARATOR).append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR).append((String) eventProcType.getRight()).append(AttrConstants.SEPARATOR).append(DateTimeUtils.ms2yyyyMMddHHmm((((Long.parseLong(str3) / 1000) / 60) / 10) * 1000 * 60 * 10)).append(AttrConstants.SEPARATOR).append(DateTimeUtils.ms2yyyyMMddHHmm(j));
                try {
                    this.processor.processEvent(withBody);
                    this.monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                    addStatistics(true, r0.length, withBody);
                    this.monitorIndex.addAndGet(sb.toString(), parseInt, 1, r0.length, 0);
                    sb.delete(0, sb.length());
                } catch (Throwable th) {
                    logger.error("Error writting to channel, data will discard.", th);
                    this.monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                    this.monitorIndex.addAndGet(sb.toString(), 0, 0, 0L, parseInt);
                    addStatistics(false, r0.length, withBody);
                    sb.delete(0, sb.length());
                    throw new ChannelException("ProcessEvent error can't write event to channel.");
                }
            }
        }
    }

    private void addStatistics(boolean z, long j, Event event) {
        if (event == null) {
            return;
        }
        this.metricItemSet.fillSrcMetricItemsByEvent(event, z, j);
        if (z) {
            AuditUtils.add(5, event);
        }
    }
}
