package org.apache.inlong.dataproxy.source;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleMessageHandler.class */
public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
    private BaseSource source;
    private final ChannelGroup allChannels;
    private int maxConnections;
    private boolean filterEmptyMsg;
    private final boolean isCompressed;
    private final ChannelProcessor processor;
    private final ServiceDecoder serviceProcessor;
    private final String defaultTopic;
    private String defaultMXAttr;
    private final String protocolType;
    private final DataProxyMetricItemSet metricItemSet;
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageHandler.class);
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttrConstants.SEPARATOR).withKeyValueSeparator(AttrConstants.KEY_VALUE_SEPARATOR);
    private static final Splitter.MapSplitter mapSplitter = Splitter.on(AttrConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttrConstants.KEY_VALUE_SEPARATOR);
    private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() { // from class: org.apache.inlong.dataproxy.source.SimpleMessageHandler.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyyMMddHHmm");
        }
    };
    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() { // from class: org.apache.inlong.dataproxy.source.SimpleMessageHandler.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyyMMddHHmmss");
        }
    };

    public SimpleMessageHandler(BaseSource baseSource, ServiceDecoder serviceDecoder, ChannelGroup channelGroup, String str, String str2, Boolean bool, Integer num, Integer num2, Boolean bool2, String str3) {
        this.maxConnections = Integer.MAX_VALUE;
        this.filterEmptyMsg = false;
        this.defaultMXAttr = "m=3";
        this.source = baseSource;
        this.processor = baseSource.getChannelProcessor();
        this.serviceProcessor = serviceDecoder;
        this.allChannels = channelGroup;
        this.defaultTopic = str;
        if (null != str2) {
            this.defaultMXAttr = str2;
        }
        this.filterEmptyMsg = bool.booleanValue();
        this.isCompressed = bool2.booleanValue();
        this.maxConnections = num2.intValue();
        this.protocolType = str3;
        this.metricItemSet = baseSource.getMetricItemSet();
    }

    private byte[] newBinMsg(byte[] bArr, String str) {
        int length;
        String str2;
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        int i = wrap.getInt(0);
        int i2 = wrap.getInt(21);
        short s = wrap.getShort(25 + i2);
        if (s != 0) {
            length = i + str.length() + AttrConstants.SEPARATOR.length();
            str2 = AttrConstants.SEPARATOR + str;
        } else {
            length = i + str.length();
            str2 = str;
        }
        ByteBuffer allocate = ByteBuffer.allocate(length + 4);
        allocate.put(wrap.array(), 0, i2 + 27 + s);
        allocate.putShort(i2 + 25, (short) (str2.length() + s));
        System.arraycopy(str2.getBytes(StandardCharsets.UTF_8), 0, allocate.array(), i2 + 27 + s, str2.length());
        allocate.putShort(9, (short) (wrap.getShort(9) | 4));
        allocate.putInt(0, length);
        allocate.putShort((length + 4) - 2, (short) -4607);
        return allocate.array();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        String channelRemoteIP;
        if (this.allChannels.size() - 1 >= this.maxConnections) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.source.getName(), channelHandlerContext.channel(), Integer.valueOf(this.allChannels.size() - 1), Integer.valueOf(this.maxConnections)});
        } else if (ConfigManager.getInstance().needChkIllegalIP() && (channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel())) != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.error(channelRemoteIP + " is Illegal IP, so refuse it !");
        } else {
            this.allChannels.add(channelHandlerContext.channel());
            channelHandlerContext.fireChannelActive();
            logger.info("{} added new channel, current connections = {}, maxConnections = {}", new Object[]{this.source.getName(), Integer.valueOf(this.allChannels.size() - 1), Integer.valueOf(this.maxConnections)});
        }
    }

    private void checkGroupIdInfo(ProxyMessage proxyMessage, Map<String, String> map, Map<String, String> map2, AtomicReference<String> atomicReference) {
        String groupId = proxyMessage.getGroupId();
        String streamId = proxyMessage.getStreamId();
        if (null != groupId) {
            String topicName = configManager.getTopicName(groupId, streamId);
            if (StringUtils.isNotEmpty(topicName)) {
                atomicReference.set(topicName.trim());
            }
            Map<String, String> map3 = configManager.getMxPropertiesMaps().get(groupId);
            if (map3 == null || map3.size() == 0) {
                proxyMessage.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
                return;
            } else {
                proxyMessage.getAttributeMap().putAll(map3);
                return;
            }
        }
        String str = map.get(AttrConstants.NUM2NAME);
        String str2 = map.get(AttrConstants.GROUPID_NUM);
        String str3 = map.get(AttrConstants.STREAMID_NUM);
        if (configManager.getGroupIdMappingProperties() == null || configManager.getStreamIdMappingProperties() == null) {
            return;
        }
        String str4 = configManager.getGroupIdMappingProperties().get(str2);
        String str5 = configManager.getStreamIdMappingProperties().get(str2) == null ? null : configManager.getStreamIdMappingProperties().get(str2).get(str3);
        if (str4 == null || str5 == null) {
            return;
        }
        if ("TRUE".equalsIgnoreCase(configManager.getGroupIdEnableMappingProperties() == null ? null : configManager.getGroupIdEnableMappingProperties().get(str2)) && "TRUE".equalsIgnoreCase(str)) {
            proxyMessage.setData(newBinMsg(proxyMessage.getData(), "groupId=" + str4 + "&streamId=" + str5));
        }
        map2.put("groupId", str4);
        map2.put("streamId", str5);
        proxyMessage.setGroupId(str4);
        proxyMessage.setStreamId(str5);
        String topicName2 = configManager.getTopicName(str4, str5);
        if (StringUtils.isNotEmpty(topicName2)) {
            atomicReference.set(topicName2.trim());
        }
    }

    private void updateMsgList(List<ProxyMessage> list, Map<String, String> map, Map<String, HashMap<String, List<ProxyMessage>>> map2, String str, MsgType msgType) {
        for (ProxyMessage proxyMessage : list) {
            Map<String, String> attributeMap = proxyMessage.getAttributeMap();
            AtomicReference<String> atomicReference = new AtomicReference<>(this.defaultTopic);
            checkGroupIdInfo(proxyMessage, map, attributeMap, atomicReference);
            String str2 = atomicReference.get();
            proxyMessage.setTopic(str2);
            map.put("NodeIP", str);
            String groupId = proxyMessage.getGroupId();
            String streamId = proxyMessage.getStreamId();
            if (ConfigConstants.SLA_METRIC_GROUPID.equals(groupId)) {
                map.put(ConfigConstants.SLA_METRIC_DATA, "true");
                proxyMessage.setTopic(ConfigConstants.SLA_METRIC_DATA);
            }
            if (!"pb".equals(attributeMap.get("mt")) && !MsgType.MSG_MULTI_BODY.equals(msgType) && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                byte[] data = proxyMessage.getData();
                if (data[data.length - 1] == 10) {
                    int length = data.length - 1;
                    if (data[data.length - 2] == 13) {
                        length = data.length - 2;
                    }
                    byte[] bArr = new byte[length];
                    System.arraycopy(data, 0, bArr, 0, length);
                    proxyMessage.setData(bArr);
                }
            }
            if (streamId == null) {
                streamId = "";
            }
            map2.computeIfAbsent(str2, str3 -> {
                return new HashMap();
            }).computeIfAbsent(streamId, str4 -> {
                return new ArrayList();
            }).add(proxyMessage);
        }
    }

    private void formatMessagesAndSend(Map<String, String> map, Map<String, HashMap<String, List<ProxyMessage>>> map2, String str, MsgType msgType) throws MessageIDException {
        int i = 1;
        if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
            i = 3;
        } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
            i = 4;
        }
        for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> entry : map2.entrySet()) {
            for (Map.Entry<String, List<ProxyMessage>> entry2 : entry.getValue().entrySet()) {
                InLongMsg newInLongMsg = InLongMsg.newInLongMsg(this.isCompressed, i);
                HashMap hashMap = new HashMap();
                for (ProxyMessage proxyMessage : entry2.getValue()) {
                    if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
                        proxyMessage.getAttributeMap().put("cnt", String.valueOf(1));
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage.getAttributeMap()), proxyMessage.getData());
                    } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                        newInLongMsg.addMsg(proxyMessage.getData());
                    } else {
                        newInLongMsg.addMsg(mapJoiner.join(proxyMessage.getAttributeMap()), proxyMessage.getData());
                    }
                }
                String format = dateFormator.get().format(Long.valueOf(newInLongMsg.getCreatetime()));
                if (i == 4) {
                    format = map.containsKey("msg.pkg.time") ? map.get("msg.pkg.time") : dateFormator.get().format(Long.valueOf(System.currentTimeMillis()));
                }
                hashMap.put("dt", String.valueOf(NumberUtils.toLong(map.get("dt"), System.currentTimeMillis())));
                hashMap.put("topic", entry.getKey());
                hashMap.put("streamId", entry2.getKey());
                hashMap.put(ConfigConstants.REMOTE_IP_KEY, str);
                hashMap.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
                hashMap.put("msgcnt", map.get("cnt"));
                hashMap.put(ConfigConstants.TOTAL_LEN, String.valueOf(newInLongMsg.buildArray().length));
                String str2 = map.get("sid");
                if (StringUtils.isNotEmpty(str2)) {
                    hashMap.put(ConfigConstants.SEQUENCE_ID, entry.getKey() + AttrConstants.SEPARATOR + entry2.getKey() + AttrConstants.SEPARATOR + str2);
                }
                hashMap.put("msg.pkg.time", format);
                processProxyMessageList(hashMap, entry2.getValue());
            }
        }
    }

    private void processProxyMessageList(Map<String, String> map, List<ProxyMessage> list) {
        Iterator<ProxyMessage> it = list.iterator();
        while (it.hasNext()) {
            Event parseProxyMessage2Event = parseProxyMessage2Event(map, it.next());
            try {
                this.processor.processEvent(parseProxyMessage2Event);
                addMetric(true, parseProxyMessage2Event.getBody().length, parseProxyMessage2Event);
            } catch (Throwable th) {
                logger.error("Error writting to channel,data will discard.", th);
                addMetric(false, parseProxyMessage2Event.getBody().length, parseProxyMessage2Event);
                throw new ChannelException("ProcessEvent error can't write event to channel.");
            }
        }
    }

    private Event parseProxyMessage2Event(Map<String, String> map, ProxyMessage proxyMessage) {
        HashMap hashMap = new HashMap();
        if (proxyMessage.getAttributeMap() != null) {
            hashMap.putAll(proxyMessage.getAttributeMap());
        }
        hashMap.putAll(map);
        hashMap.put("cnt", "1");
        hashMap.put("inlongGroupId", proxyMessage.getGroupId());
        hashMap.put("inlongStreamId", proxyMessage.getStreamId());
        hashMap.put("topic", proxyMessage.getTopic());
        hashMap.put("msgTime", map.get("dt"));
        hashMap.put(Constants.HEADER_KEY_SOURCE_IP, map.get("NodeIP"));
        hashMap.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V1.getName());
        return EventBuilder.withBody(proxyMessage.getData(), hashMap);
    }

    private void responsePackage(Map<String, String> map, Map<String, Object> map2, Channel channel, SocketAddress socketAddress, MsgType msgType) throws Exception {
        if (!map.containsKey("isAck") || "true".equals(map.get("isAck"))) {
            if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                byte[] bytes = mapJoiner.join(map).getBytes(StandardCharsets.UTF_8);
                if (bytes == null || new String(bytes, StandardCharsets.UTF_8).isEmpty()) {
                    return;
                }
                byte[] bArr = MsgType.MSG_ORIGINAL_RETURN.equals(msgType) ? (byte[]) map2.get("body") : new byte[]{50};
                int length = 5 + bArr.length + 4 + bytes.length;
                ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + length);
                buffer.writeInt(length);
                buffer.writeByte(msgType.getValue());
                buffer.writeInt(bArr.length);
                buffer.writeBytes(bArr);
                buffer.writeInt(bytes.length);
                buffer.writeBytes(bytes);
                if (channel.isWritable()) {
                    channel.write(buffer);
                    return;
                }
                String str = new String(bytes, StandardCharsets.UTF_8);
                logger.warn("the send buffer1 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str);
                buffer.release();
                throw new Exception(new Throwable("the send buffer1 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str));
            }
            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                String str2 = null;
                if (map2.containsKey(ConfigConstants.DECODER_ATTRS)) {
                    str2 = (String) map2.get(ConfigConstants.DECODER_ATTRS);
                }
                int i = 9;
                if (null != str2) {
                    i = 9 + str2.length();
                }
                ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer(4 + i);
                buffer2.writeInt(i);
                buffer2.writeByte(msgType.getValue());
                long parseLong = Long.parseLong(map.get("uniq"));
                buffer2.writeBytes(new byte[]{(byte) ((parseLong >> 24) & 255), (byte) ((parseLong >> 16) & 255), (byte) ((parseLong >> 8) & 255), (byte) (parseLong & 255)});
                if (null != str2) {
                    buffer2.writeShort(str2.length());
                    buffer2.writeBytes(str2.getBytes(StandardCharsets.UTF_8));
                } else {
                    buffer2.writeShort(0);
                }
                buffer2.writeShort(60929);
                if (channel.isWritable()) {
                    channel.write(buffer2);
                } else {
                    logger.warn("the send buffer2 is full, so disconnect it!please check remote client; Connection info:" + channel + ";attr is " + str2);
                    buffer2.release();
                    throw new Exception(new Throwable("the send buffer2 is full,so disconnect it!please check remote client, Connection info:" + channel + ";attr is " + str2));
                }
            }
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Event withBody;
        ChannelException channelException;
        logger.info("message received");
        if (obj == null) {
            logger.error("get null messageevent, just skip");
            addMetric(false, 0L, null);
            return;
        }
        Channel channel = channelHandlerContext.channel();
        String channelRemoteIP = AddressUtils.getChannelRemoteIP(channel);
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            if (byteBuf.readableBytes() == 0 && this.filterEmptyMsg) {
                logger.warn("skip empty msg.");
                byteBuf.clear();
                addMetric(false, 0L, null);
                byteBuf.release();
                return;
            }
            try {
                Map<String, Object> extractData = this.serviceProcessor.extractData(byteBuf, channelRemoteIP, System.currentTimeMillis(), channel);
                if (extractData == null) {
                    logger.info("result is null");
                    addMetric(false, 0L, null);
                    byteBuf.release();
                    return;
                }
                MsgType msgType = (MsgType) extractData.get(ConfigConstants.MSG_TYPE);
                if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
                    buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
                    channel.write(buffer);
                    addMetric(false, 0L, null);
                    byteBuf.release();
                    return;
                }
                if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
                    addMetric(false, 0L, null);
                    byteBuf.release();
                    return;
                }
                Map<String, String> map = (Map) extractData.get(ConfigConstants.COMMON_ATTR_MAP);
                if (map == null) {
                    map = new HashMap();
                }
                List<ProxyMessage> list = (List) extractData.get(ConfigConstants.MSG_LIST);
                if (list != null && !map.containsKey(ConfigConstants.FILE_CHECK_DATA) && !map.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                    HashMap hashMap = new HashMap(list.size());
                    updateMsgList(list, map, hashMap, channelRemoteIP, msgType);
                    formatMessagesAndSend(map, hashMap, channelRemoteIP, msgType);
                } else if (list != null && map.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("msgtype", "filestatus");
                    hashMap2.put(ConfigConstants.FILE_CHECK_DATA, "true");
                    Iterator<ProxyMessage> it = list.iterator();
                    while (it.hasNext()) {
                        withBody = EventBuilder.withBody(it.next().getData(), hashMap2);
                        try {
                            this.processor.processEvent(withBody);
                            addMetric(true, r0.length, withBody);
                        } finally {
                        }
                    }
                } else if (list != null && map.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
                    logger.info("i am in MINUTE_CHECK_DATA");
                    HashMap hashMap3 = new HashMap();
                    hashMap3.put("msgtype", "measure");
                    hashMap3.put(ConfigConstants.FILE_CHECK_DATA, "true");
                    Iterator<ProxyMessage> it2 = list.iterator();
                    while (it2.hasNext()) {
                        withBody = EventBuilder.withBody(it2.next().getData(), hashMap3);
                        try {
                            this.processor.processEvent(withBody);
                            addMetric(true, r0.length, withBody);
                        } finally {
                        }
                    }
                }
                responsePackage(map, extractData, channel, channel.remoteAddress(), msgType);
                byteBuf.release();
            } catch (MessageIDException e) {
                addMetric(false, 0L, null);
                throw new IOException(e.getCause());
            }
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        logger.error("exception caught cause = {}", th);
        channelHandlerContext.fireExceptionCaught(th);
        channelHandlerContext.close();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        logger.error("channel inactive {}", channelHandlerContext.channel());
        channelHandlerContext.fireChannelInactive();
    }

    private void addMetric(boolean z, long j, Event event) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", "DataProxy");
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, this.source.getName());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.source.getName());
        DataProxyMetricItem.fillInlongId(event, hashMap);
        DataProxyMetricItem.fillAuditFormatTime(event, hashMap);
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.metricItemSet.findMetricItem(hashMap);
        if (!z) {
            dataProxyMetricItem.readFailCount.incrementAndGet();
            dataProxyMetricItem.readFailSize.addAndGet(j);
        } else {
            dataProxyMetricItem.readSuccessCount.incrementAndGet();
            dataProxyMetricItem.readSuccessSize.addAndGet(j);
            AuditUtils.add(5, event);
        }
    }
}
