package org.apache.inlong.dataproxy.source;

import com.google.common.base.Splitter;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/DefaultServiceDecoder.class */
public class DefaultServiceDecoder implements ServiceDecoder {
    private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
    private static final int BIN_MSG_TOTALLEN_SIZE = 4;
    private static final int BIN_MSG_MSGTYPE_OFFSET = 4;
    private static final int BIN_MSG_EXTEND_OFFSET = 9;
    private static final int BIN_MSG_EXTEND_SIZE = 2;
    private static final int BIN_MSG_SET_SNAPPY = 32;
    private static final int BIN_MSG_BODYLEN_SIZE = 4;
    private static final int BIN_MSG_BODYLEN_OFFSET = 21;
    private static final int BIN_MSG_BODY_OFFSET = 25;
    private static final int BIN_MSG_ATTRLEN_SIZE = 2;
    private static final int BIN_MSG_FORMAT_SIZE = 29;
    private static final int BIN_MSG_MAGIC_SIZE = 2;
    private static final int BIN_MSG_MAGIC = 60929;
    private static final int BIN_HB_TOTALLEN_SIZE = 4;
    private static final int BIN_HB_BODYLEN_OFFSET = 10;
    private static final int BIN_HB_BODYLEN_SIZE = 4;
    private static final int BIN_HB_BODY_OFFSET = 14;
    private static final int BIN_HB_ATTRLEN_SIZE = 2;
    private static final int BIN_HB_FORMAT_SIZE = 17;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceDecoder.class);
    private static final Splitter.MapSplitter mapSplitter = Splitter.on(AttributeConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);

    private Map<String, Object> extractNewBinHB(Map<String, Object> map, ChannelBuffer channelBuffer, Channel channel, MessageEvent messageEvent, int i) throws Exception {
        int readerIndex = channelBuffer.readerIndex() - 5;
        int i2 = channelBuffer.getInt(readerIndex + BIN_HB_BODYLEN_OFFSET);
        int i3 = channelBuffer.getShort(readerIndex + BIN_HB_BODY_OFFSET + i2);
        int unsignedShort = channelBuffer.getUnsignedShort(readerIndex + BIN_HB_BODY_OFFSET + i2 + 2 + i3);
        if (i + 4 < i2 + i3 + BIN_HB_FORMAT_SIZE || unsignedShort != BIN_MSG_MAGIC) {
            LOG.error("err msg, bodyLen + attrLen > totalDataLen, and bodyLen={},attrLen={},totalDataLen={},magic={};Connection info:{}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i), Integer.toHexString(unsignedShort), channel.toString()});
            return map;
        }
        channelBuffer.skipBytes(BIN_MSG_EXTEND_OFFSET + i2 + 2);
        if (i3 != 0) {
            byte[] bArr = new byte[i3];
            channelBuffer.readBytes(bArr, BIN_MSG_TOTALLEN_OFFSET, i3);
            map.put(ConfigConstants.DECODER_ATTRS, new String(bArr, StandardCharsets.UTF_8));
        }
        map.put(ConfigConstants.VERSION_TYPE, Byte.valueOf(channelBuffer.getByte(readerIndex + BIN_MSG_EXTEND_OFFSET)));
        return map;
    }

    private void handleDateTime(Map<String, String> map, Channel channel, MessageEvent messageEvent, long j, long j2, int i) {
        map.put(AttributeConstants.UNIQ_ID, String.valueOf(j));
        String valueOf = map.containsKey("msg.pkg.time") ? map.get("msg.pkg.time") : String.valueOf(j2);
        StringBuilder sb = new StringBuilder();
        String str = "";
        if (channel != null && channel.getRemoteAddress() != null) {
            str = channel.getRemoteAddress().toString();
        } else if (messageEvent != null && messageEvent.getRemoteAddress() != null) {
            str = messageEvent.getRemoteAddress().toString();
        }
        sb.append(str).append("#").append(valueOf).append("#").append(j);
        map.put(AttributeConstants.SEQUENCE_ID, new String(sb));
        map.put(AttributeConstants.DATA_TIME, String.valueOf(j2));
        map.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
        map.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(i != 0 ? i : 1));
    }

    private boolean handleExtMap(Map<String, String> map, ChannelBuffer channelBuffer, Map<String, Object> map2, int i, int i2) {
        boolean z = BIN_MSG_TOTALLEN_OFFSET;
        if ((i & 8) == 8) {
            z = true;
            int i3 = channelBuffer.getInt(i2 + BIN_MSG_BODYLEN_OFFSET + 4);
            byte[] bArr = new byte[i3];
            channelBuffer.getBytes(i2 + BIN_MSG_BODY_OFFSET + 4, bArr, BIN_MSG_TOTALLEN_OFFSET, i3);
            map2.put(ConfigConstants.FILE_BODY, bArr);
            map.put(ConfigConstants.FILE_CHECK_DATA, "true");
        } else if ((i & 16) == 16) {
            z = true;
            int i4 = channelBuffer.getInt(i2 + BIN_MSG_BODYLEN_OFFSET + 4);
            byte[] bArr2 = new byte[i4];
            channelBuffer.getBytes(i2 + BIN_MSG_BODY_OFFSET + 4, bArr2, BIN_MSG_TOTALLEN_OFFSET, i4);
            map2.put(ConfigConstants.FILE_BODY, bArr2);
            map.put(ConfigConstants.MINUTE_CHECK_DATA, "true");
        }
        return z;
    }

    private ByteBuffer handleTrace(Channel channel, ChannelBuffer channelBuffer, int i, int i2, int i3, int i4, String str, int i5) {
        int length;
        String str2;
        ByteBuffer allocate;
        if (((i & 2) >> 1) == 1) {
            String str3 = BIN_MSG_TOTALLEN_OFFSET;
            SocketAddress localAddress = channel.getLocalAddress();
            if (BIN_MSG_TOTALLEN_OFFSET != localAddress) {
                str3 = localAddress.toString();
                try {
                    str3 = str3.substring(1, str3.indexOf(58));
                } catch (Exception e) {
                    LOG.warn("fail to get the local IP, and strIP={},localSocketAddress={}", str3, localAddress);
                }
            }
            String str4 = "node2ip=" + str3 + "&rtime2=" + System.currentTimeMillis();
            if (i4 != 0) {
                length = i3 + str4.length() + AttributeConstants.SEPARATOR.length();
                str2 = str + AttributeConstants.SEPARATOR + str4;
            } else {
                length = i3 + str4.length();
                str2 = str4;
            }
            allocate = ByteBuffer.allocate(length + 4);
            channelBuffer.getBytes(i2, allocate.array(), BIN_MSG_TOTALLEN_OFFSET, i5 + BIN_MSG_BODY_OFFSET);
            allocate.putShort(i5 + BIN_MSG_BODY_OFFSET, (short) str2.length());
            System.arraycopy(str2.getBytes(StandardCharsets.UTF_8), BIN_MSG_TOTALLEN_OFFSET, allocate.array(), i5 + 27, str2.length());
            allocate.putInt(BIN_MSG_TOTALLEN_OFFSET, length);
            allocate.putShort((length + 4) - 2, (short) -4607);
        } else {
            allocate = ByteBuffer.allocate(i3 + 4);
            channelBuffer.getBytes(i2, allocate.array(), BIN_MSG_TOTALLEN_OFFSET, i3 + 4);
        }
        return allocate;
    }

    private Map<String, Object> extractNewBinData(Map<String, Object> map, ChannelBuffer channelBuffer, Channel channel, MessageEvent messageEvent, int i, MsgType msgType) throws Exception {
        int readerIndex = channelBuffer.readerIndex() - 5;
        int i2 = channelBuffer.getInt(readerIndex + BIN_MSG_BODYLEN_OFFSET);
        int i3 = channelBuffer.getShort(readerIndex + BIN_MSG_BODY_OFFSET + i2);
        int unsignedShort = channelBuffer.getUnsignedShort(readerIndex + BIN_MSG_BODY_OFFSET + i2 + 2 + i3);
        if (i2 == 0) {
            throw new Exception(new Throwable("err msg,  bodyLen is empty;Connection info:" + channel.toString()));
        }
        if (i + 4 < i2 + i3 + BIN_MSG_FORMAT_SIZE || unsignedShort != BIN_MSG_MAGIC) {
            throw new Exception(new Throwable("err msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen=" + i2 + ",totalDataLen=" + i + ",attrLen=" + i3 + ";magic=" + Integer.toHexString(unsignedShort) + ";Connection info:" + channel.toString()));
        }
        int readUnsignedShort = channelBuffer.readUnsignedShort();
        int readUnsignedShort2 = channelBuffer.readUnsignedShort();
        int readUnsignedShort3 = channelBuffer.readUnsignedShort();
        long readUnsignedInt = channelBuffer.readUnsignedInt();
        int readUnsignedShort4 = channelBuffer.readUnsignedShort();
        long readUnsignedInt2 = channelBuffer.readUnsignedInt();
        long j = readUnsignedInt * 1000;
        HashMap hashMap = new HashMap();
        channelBuffer.skipBytes(4 + i2 + 2);
        map.put(ConfigConstants.COMMON_ATTR_MAP, hashMap);
        map.put(ConfigConstants.EXTRA_ATTR, (readUnsignedShort3 & 1) == 1 ? "true" : "false");
        byte[] bArr = new byte[i2];
        channelBuffer.getBytes(readerIndex + BIN_MSG_BODY_OFFSET, bArr, BIN_MSG_TOTALLEN_OFFSET, i2);
        map.put("body", bArr);
        String str = BIN_MSG_TOTALLEN_OFFSET;
        if (i3 != 0) {
            byte[] bArr2 = new byte[i3];
            channelBuffer.readBytes(bArr2, BIN_MSG_TOTALLEN_OFFSET, i3);
            str = new String(bArr2, StandardCharsets.UTF_8);
            map.put(ConfigConstants.DECODER_ATTRS, str);
            try {
                hashMap.putAll(mapSplitter.split(str));
            } catch (Exception e) {
                channelBuffer.clear();
                throw new MessageIDException(readUnsignedInt2, ErrorCode.ATTR_ERROR, new Throwable("[Parse Error]new six segment protocol ,attr is " + str + " , channel info:" + channel.toString()));
            }
        }
        try {
            handleDateTime(hashMap, channel, messageEvent, readUnsignedInt2, j, readUnsignedShort4);
            boolean handleExtMap = handleExtMap(hashMap, channelBuffer, map, readUnsignedShort3, readerIndex);
            ByteBuffer handleTrace = handleTrace(channel, channelBuffer, readUnsignedShort3, readerIndex, i, i3, str, i2);
            String str2 = BIN_MSG_TOTALLEN_OFFSET;
            String str3 = BIN_MSG_TOTALLEN_OFFSET;
            if (hashMap.containsKey(AttributeConstants.GROUP_ID)) {
                str2 = hashMap.get(AttributeConstants.GROUP_ID);
            }
            if (hashMap.containsKey(AttributeConstants.INTERFACE_ID)) {
                str3 = hashMap.get(AttributeConstants.INTERFACE_ID);
            }
            if (str2 == null || str3 == null) {
                if ((((readUnsignedShort3 & 4) >> 2) == 0) && BIN_MSG_TOTALLEN_OFFSET != readUnsignedShort && BIN_MSG_TOTALLEN_OFFSET != readUnsignedShort2) {
                    hashMap.put(AttributeConstants.NUM2NAME, "TRUE");
                    hashMap.put(AttributeConstants.GROUPID_NUM, String.valueOf(readUnsignedShort));
                    hashMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(readUnsignedShort2));
                }
            } else {
                hashMap.put(AttributeConstants.NUM2NAME, "FALSE");
                handleTrace.putShort(BIN_MSG_EXTEND_OFFSET, (short) (readUnsignedShort3 | 4));
            }
            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !handleExtMap) {
                ArrayList arrayList = new ArrayList(1);
                arrayList.add(new ProxyMessage(str2, str3, hashMap, handleTrace.array()));
                map.put(ConfigConstants.MSG_LIST, arrayList);
            } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                ArrayList arrayList2 = new ArrayList(1);
                arrayList2.add(new ProxyMessage(str2, str3, hashMap, (byte[]) map.get(ConfigConstants.FILE_BODY)));
                map.put(ConfigConstants.MSG_LIST, arrayList2);
            }
            return map;
        } catch (Exception e2) {
            LOG.error("extractNewBinData has error! ex = {}", e2);
            channelBuffer.clear();
            throw new MessageIDException(readUnsignedInt2, ErrorCode.OTHER_ERROR, e2.getCause());
        }
    }

    private Map<String, Object> extractDefaultData(Map<String, Object> map, ChannelBuffer channelBuffer, Channel channel, MessageEvent messageEvent, int i, MsgType msgType) throws Exception {
        ArrayList arrayList;
        int readInt = channelBuffer.readInt();
        if (readInt == 0) {
            throw new Exception(new Throwable("err msg,  bodyLen is empty;Connection info:" + channel.toString()));
        }
        if (readInt > i - 5) {
            throw new Exception(new Throwable("err msg, firstLen > totalDataLen, and bodyLen=" + readInt + ",totalDataLen=" + i + ";Connection info:" + channel.toString()));
        }
        byte[] bArr = new byte[readInt];
        channelBuffer.readBytes(bArr, BIN_MSG_TOTALLEN_OFFSET, readInt);
        map.put("body", bArr);
        int readInt2 = channelBuffer.readInt();
        if (i != BIN_MSG_EXTEND_OFFSET + readInt2 + readInt) {
            throw new Exception(new Throwable("err msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + readInt + ",totalDataLen=" + i + ",attrDataLen=" + readInt2 + ";Connection info:" + channel.toString()));
        }
        byte[] bArr2 = new byte[readInt2];
        channelBuffer.readBytes(bArr2, BIN_MSG_TOTALLEN_OFFSET, readInt2);
        String str = new String(bArr2, StandardCharsets.UTF_8);
        map.put(ConfigConstants.DECODER_ATTRS, str);
        try {
            HashMap hashMap = new HashMap(mapSplitter.split(str));
            map.put(ConfigConstants.COMMON_ATTR_MAP, hashMap);
            String str2 = (String) hashMap.get(AttributeConstants.COMPRESS_TYPE);
            map.put(ConfigConstants.COMPRESS_TYPE, str2);
            if (StringUtils.isNotBlank(str2)) {
                byte[] processUnCompress = processUnCompress(bArr, str2);
                if (processUnCompress == null || processUnCompress.length == 0) {
                    throw new Exception(new Throwable("Uncompress data error!compress type:" + str2 + ";data:" + new String(bArr, StandardCharsets.UTF_8) + ";attr:" + str + ";channel:" + channel.toString()));
                }
                bArr = processUnCompress;
            }
            hashMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
            String str3 = (String) hashMap.get(AttributeConstants.GROUP_ID);
            String str4 = (String) hashMap.get(AttributeConstants.INTERFACE_ID);
            String str5 = (String) hashMap.get(AttributeConstants.MESSAGE_COUNT);
            int parseInt = str5 != null ? Integer.parseInt(str5) : 1;
            hashMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(parseInt));
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            if (MsgType.MSG_MULTI_BODY.equals(msgType)) {
                arrayList = new ArrayList(parseInt);
                while (wrap.remaining() > 0) {
                    int i2 = wrap.getInt();
                    if (i2 <= 0 || i2 > wrap.remaining()) {
                        throw new Exception(new Throwable("[Malformed Data]Invalid data len!channel is " + channel.toString()));
                    }
                    byte[] bArr3 = new byte[i2];
                    wrap.get(bArr3);
                    arrayList.add(new ProxyMessage(str3, str4, hashMap, bArr3));
                }
            } else {
                arrayList = new ArrayList(1);
                arrayList.add(new ProxyMessage(str3, str4, hashMap, bArr));
            }
            map.put(ConfigConstants.MSG_LIST, arrayList);
            return map;
        } catch (Exception e) {
            throw new Exception(new Throwable("Parse commonAttrMap error.commonAttrString is: " + str + " ,channel is :" + channel.toString()));
        }
    }

    private byte[] processUnCompress(byte[] bArr, String str) {
        try {
            byte[] bArr2 = new byte[Snappy.uncompressedLength(bArr, BIN_MSG_TOTALLEN_OFFSET, bArr.length)];
            Snappy.uncompress(bArr, BIN_MSG_TOTALLEN_OFFSET, bArr.length, bArr2, BIN_MSG_TOTALLEN_OFFSET);
            return bArr2;
        } catch (IOException e) {
            LOG.error("Uncompress data error!", e);
            return null;
        }
    }

    @Override // org.apache.inlong.dataproxy.source.ServiceDecoder
    public Map<String, Object> extractData(ChannelBuffer channelBuffer, Channel channel, MessageEvent messageEvent) throws Exception {
        Map<String, Object> hashMap = new HashMap<>();
        if (BIN_MSG_TOTALLEN_OFFSET == channelBuffer) {
            LOG.error("cb == null");
            return hashMap;
        }
        int readableBytes = channelBuffer.readableBytes();
        if (20971520 < readableBytes) {
            throw new Exception(new Throwable("err msg, ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen, and  totalLen=" + readableBytes));
        }
        channelBuffer.markReaderIndex();
        int readInt = channelBuffer.readInt();
        if (readInt + 4 > readableBytes) {
            channelBuffer.resetReaderIndex();
            return null;
        }
        byte readByte = channelBuffer.readByte();
        int i = (readByte & 224) >> 5;
        MsgType valueOf = MsgType.valueOf(readByte);
        hashMap.put(ConfigConstants.MSG_TYPE, valueOf);
        if (MsgType.MSG_HEARTBEAT.equals(valueOf) || MsgType.MSG_UNKNOWN.equals(valueOf)) {
            return hashMap;
        }
        if (MsgType.MSG_BIN_HEARTBEAT.equals(valueOf)) {
            return extractNewBinHB(hashMap, channelBuffer, channel, messageEvent, readInt);
        }
        if (valueOf.getValue() < MsgType.MSG_BIN_MULTI_BODY.getValue()) {
            return extractDefaultData(hashMap, channelBuffer, channel, messageEvent, readInt, valueOf);
        }
        hashMap.put(ConfigConstants.COMPRESS_TYPE, i != 0 ? "snappy" : "");
        return extractNewBinData(hashMap, channelBuffer, channel, messageEvent, readInt, valueOf);
    }
}
