package org.apache.inlong.dataproxy.source;

import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/DefaultServiceDecoder.class */
public class DefaultServiceDecoder implements ServiceDecoder {
    private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
    private static final int BIN_MSG_TOTALLEN_SIZE = 4;
    private static final int BIN_MSG_MSGTYPE_OFFSET = 4;
    private static final int BIN_MSG_EXTEND_OFFSET = 9;
    private static final int BIN_MSG_EXTEND_SIZE = 2;
    private static final int BIN_MSG_SET_SNAPPY = 32;
    private static final int BIN_MSG_BODYLEN_SIZE = 4;
    private static final int BIN_MSG_BODYLEN_OFFSET = 21;
    private static final int BIN_MSG_BODY_OFFSET = 25;
    private static final int BIN_MSG_ATTRLEN_SIZE = 2;
    private static final int BIN_MSG_FORMAT_SIZE = 29;
    private static final int BIN_MSG_MAGIC_SIZE = 2;
    private static final int BIN_MSG_MAGIC = 60929;
    private static final int BIN_HB_TOTALLEN_SIZE = 4;
    private static final int BIN_HB_BODYLEN_OFFSET = 10;
    private static final int BIN_HB_BODYLEN_SIZE = 4;
    private static final int BIN_HB_BODY_OFFSET = 14;
    private static final int BIN_HB_ATTRLEN_SIZE = 2;
    private static final int BIN_HB_FORMAT_SIZE = 17;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceDecoder.class);
    private static final Splitter.MapSplitter mapSplitter = Splitter.on(AttrConstants.SEPARATOR).trimResults().withKeyValueSeparator(AttrConstants.KEY_VALUE_SEPARATOR);

    private Map<String, Object> extractNewBinHB(Map<String, Object> map, ByteBuf byteBuf, Channel channel, int i) throws Exception {
        int readerIndex = byteBuf.readerIndex() - 5;
        int i2 = byteBuf.getInt(readerIndex + BIN_HB_BODYLEN_OFFSET);
        int i3 = byteBuf.getShort(readerIndex + BIN_HB_BODY_OFFSET + i2);
        int unsignedShort = byteBuf.getUnsignedShort(readerIndex + BIN_HB_BODY_OFFSET + i2 + 2 + i3);
        if (i + 4 < i2 + i3 + BIN_HB_FORMAT_SIZE || unsignedShort != BIN_MSG_MAGIC) {
            LOG.error("err msg, bodyLen + attrLen > totalDataLen, and bodyLen={},attrLen={},totalDataLen={},magic={};Connection info:{}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i), Integer.toHexString(unsignedShort), channel.toString()});
            return map;
        }
        byteBuf.skipBytes(BIN_MSG_EXTEND_OFFSET + i2 + 2);
        if (i3 != 0) {
            byte[] bArr = new byte[i3];
            byteBuf.readBytes(bArr, 0, i3);
            map.put(ConfigConstants.DECODER_ATTRS, new String(bArr, StandardCharsets.UTF_8));
        }
        map.put(ConfigConstants.VERSION_TYPE, Byte.valueOf(byteBuf.getByte(readerIndex + BIN_MSG_EXTEND_OFFSET)));
        return map;
    }

    private void handleDateTime(Map<String, String> map, long j, long j2, int i, String str, long j3) {
        map.put("uniq", String.valueOf(j));
        map.put("sid", new StringBuilder(256).append(str).append(AttrConstants.SEP_HASHTAG).append(String.valueOf(j2)).append(AttrConstants.SEP_HASHTAG).append(j).toString());
        map.put("dt", String.valueOf(j2));
        map.put("rt", String.valueOf(j3));
        map.put("cnt", String.valueOf(i));
    }

    private boolean handleExtMap(Map<String, String> map, ByteBuf byteBuf, Map<String, Object> map2, int i, int i2) {
        boolean z = false;
        if ((i & 8) == 8) {
            z = true;
            int i3 = byteBuf.getInt(i2 + BIN_MSG_BODYLEN_OFFSET + 4);
            byte[] bArr = new byte[i3];
            byteBuf.getBytes(i2 + BIN_MSG_BODY_OFFSET + 4, bArr, 0, i3);
            map2.put(ConfigConstants.FILE_BODY, bArr);
            map.put(ConfigConstants.FILE_CHECK_DATA, "true");
        } else if ((i & 16) == 16) {
            z = true;
            int i4 = byteBuf.getInt(i2 + BIN_MSG_BODYLEN_OFFSET + 4);
            byte[] bArr2 = new byte[i4];
            byteBuf.getBytes(i2 + BIN_MSG_BODY_OFFSET + 4, bArr2, 0, i4);
            map2.put(ConfigConstants.FILE_BODY, bArr2);
            map.put(ConfigConstants.MINUTE_CHECK_DATA, "true");
        }
        return z;
    }

    private ByteBuffer handleExtraAppendAttrInfo(Map<String, String> map, Channel channel, ByteBuf byteBuf, int i, int i2, int i3, int i4, String str, int i5, long j) {
        ByteBuffer allocate;
        boolean z = false;
        String str2 = "";
        if (StringUtils.isBlank(map.get("rtms"))) {
            z = true;
            str2 = "rtms=" + System.currentTimeMillis();
        }
        String str3 = "";
        if (((i & 2) >> 1) == 1) {
            z = true;
            String str4 = null;
            SocketAddress localAddress = channel.localAddress();
            if (null != localAddress) {
                str4 = localAddress.toString();
                try {
                    str4 = str4.substring(1, str4.indexOf(58));
                } catch (Exception e) {
                    LOG.warn("fail to get the local IP, and strIP={},localSocketAddress={}", str4, localAddress);
                }
            }
            str3 = "node2ip=" + str4 + AttrConstants.SEPARATOR + "rtime2" + AttrConstants.KEY_VALUE_SEPARATOR + j;
        }
        if (z) {
            int i6 = i3;
            if (StringUtils.isNotEmpty(str2)) {
                if (StringUtils.isEmpty(str)) {
                    i6 += str2.length();
                    str = str2;
                } else {
                    i6 += AttrConstants.SEPARATOR.length() + str2.length();
                    str = str + AttrConstants.SEPARATOR + str2;
                }
            }
            if (StringUtils.isNotEmpty(str3)) {
                if (StringUtils.isEmpty(str)) {
                    i6 += str3.length();
                    str = str3;
                } else {
                    i6 += AttrConstants.SEPARATOR.length() + str3.length();
                    str = str + AttrConstants.SEPARATOR + str3;
                }
            }
            allocate = ByteBuffer.allocate(i6 + 4);
            byteBuf.getBytes(i2, allocate.array(), 0, i5 + BIN_MSG_BODY_OFFSET);
            allocate.putShort(i5 + BIN_MSG_BODY_OFFSET, (short) str.length());
            System.arraycopy(str.getBytes(StandardCharsets.UTF_8), 0, allocate.array(), i5 + 27, str.length());
            allocate.putInt(0, i6);
            allocate.putShort((i6 + 4) - 2, (short) -4607);
        } else {
            allocate = ByteBuffer.allocate(i3 + 4);
            byteBuf.getBytes(i2, allocate.array(), 0, i3 + 4);
        }
        return allocate;
    }

    private Map<String, Object> extractNewBinData(Map<String, Object> map, ByteBuf byteBuf, Channel channel, int i, MsgType msgType, String str, long j) throws Exception {
        int readerIndex = byteBuf.readerIndex() - 5;
        int i2 = byteBuf.getInt(readerIndex + BIN_MSG_BODYLEN_OFFSET);
        if (i2 == 0) {
            throw new Exception("Error msg,  bodyLen is empty; connection info:" + str);
        }
        int i3 = byteBuf.getShort(readerIndex + BIN_MSG_BODY_OFFSET + i2);
        int unsignedShort = byteBuf.getUnsignedShort(readerIndex + BIN_MSG_BODY_OFFSET + i2 + 2 + i3);
        if (unsignedShort != BIN_MSG_MAGIC || i + 4 < i2 + i3 + BIN_MSG_FORMAT_SIZE) {
            throw new Exception("Error msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen=" + i2 + ",attrLen=" + i3 + ",totalDataLen=" + i + ";magic=" + Integer.toHexString(unsignedShort) + "; connection info:" + str);
        }
        int readUnsignedShort = byteBuf.readUnsignedShort();
        int readUnsignedShort2 = byteBuf.readUnsignedShort();
        int readUnsignedShort3 = byteBuf.readUnsignedShort();
        long readUnsignedInt = byteBuf.readUnsignedInt() * 1000;
        int readUnsignedShort4 = byteBuf.readUnsignedShort();
        int i4 = readUnsignedShort4 != 0 ? readUnsignedShort4 : 1;
        long readUnsignedInt2 = byteBuf.readUnsignedInt();
        byteBuf.skipBytes(4 + i2 + 2);
        byte[] bArr = new byte[i2];
        byteBuf.getBytes(readerIndex + BIN_MSG_BODY_OFFSET, bArr, 0, i2);
        String str2 = null;
        HashMap hashMap = new HashMap();
        if (i3 != 0) {
            byte[] bArr2 = new byte[i3];
            byteBuf.readBytes(bArr2, 0, i3);
            str2 = new String(bArr2, StandardCharsets.UTF_8);
            map.put(ConfigConstants.DECODER_ATTRS, str2);
            try {
                hashMap.putAll(mapSplitter.split(str2));
            } catch (Exception e) {
                byteBuf.clear();
                throw new MessageIDException(readUnsignedInt2, ErrorCode.ATTR_ERROR, new Throwable("[Parse Error]new six segment protocol ,attr is " + str2 + " , channel info:" + str));
            }
        }
        map.put(ConfigConstants.COMMON_ATTR_MAP, hashMap);
        map.put(ConfigConstants.EXTRA_ATTR, (readUnsignedShort3 & 1) == 1 ? "true" : "false");
        map.put("body", bArr);
        try {
            handleDateTime(hashMap, readUnsignedInt2, readUnsignedInt, i4, str, j);
            boolean handleExtMap = handleExtMap(hashMap, byteBuf, map, readUnsignedShort3, readerIndex);
            ByteBuffer handleExtraAppendAttrInfo = handleExtraAppendAttrInfo(hashMap, channel, byteBuf, readUnsignedShort3, readerIndex, i, i3, str2, i2, j);
            String str3 = hashMap.get("groupId");
            String str4 = hashMap.get("streamId");
            if (str3 == null || str4 == null) {
                if ((((readUnsignedShort3 & 4) >> 2) == 0) && 0 != readUnsignedShort && 0 != readUnsignedShort2) {
                    hashMap.put(AttrConstants.NUM2NAME, "TRUE");
                    hashMap.put(AttrConstants.GROUPID_NUM, String.valueOf(readUnsignedShort));
                    hashMap.put(AttrConstants.STREAMID_NUM, String.valueOf(readUnsignedShort2));
                }
            } else {
                hashMap.put(AttrConstants.NUM2NAME, "FALSE");
                handleExtraAppendAttrInfo.putShort(BIN_MSG_EXTEND_OFFSET, (short) (readUnsignedShort3 | 4));
            }
            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
                ArrayList arrayList = new ArrayList(1);
                if (handleExtMap) {
                    arrayList.add(new ProxyMessage(str3, str4, hashMap, (byte[]) map.get(ConfigConstants.FILE_BODY)));
                } else {
                    arrayList.add(new ProxyMessage(str3, str4, hashMap, handleExtraAppendAttrInfo.array()));
                }
                map.put(ConfigConstants.MSG_LIST, arrayList);
            }
            return map;
        } catch (Exception e2) {
            LOG.error("extractNewBinData has error: ", e2);
            byteBuf.clear();
            throw new MessageIDException(readUnsignedInt2, ErrorCode.OTHER_ERROR, e2.getCause());
        }
    }

    private Map<String, Object> extractDefaultData(Map<String, Object> map, ByteBuf byteBuf, int i, MsgType msgType, String str, long j) throws Exception {
        ArrayList arrayList;
        int readInt = byteBuf.readInt();
        if (readInt == 0) {
            throw new Exception("Error msg: bodyLen is empty, connection info:" + str);
        }
        if (readInt > i - 5) {
            throw new Exception("Error msg, firstLen > totalDataLen, and bodyLen=" + readInt + ",totalDataLen=" + i + ", connection info:" + str);
        }
        byte[] bArr = new byte[readInt];
        byteBuf.readBytes(bArr, 0, readInt);
        map.put("body", bArr);
        int readInt2 = byteBuf.readInt();
        if (i != BIN_MSG_EXTEND_OFFSET + readInt2 + readInt) {
            throw new Exception("Error msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + readInt + ",totalDataLen=" + i + ",attrDataLen=" + readInt2 + ", connection info:" + str);
        }
        byte[] bArr2 = new byte[readInt2];
        byteBuf.readBytes(bArr2, 0, readInt2);
        String str2 = new String(bArr2, StandardCharsets.UTF_8);
        try {
            HashMap hashMap = new HashMap(mapSplitter.split(str2));
            map.put(ConfigConstants.DECODER_ATTRS, str2);
            map.put(ConfigConstants.COMMON_ATTR_MAP, hashMap);
            String str3 = (String) hashMap.get("cp");
            if (StringUtils.isNotBlank(str3)) {
                map.put(ConfigConstants.COMPRESS_TYPE, str3);
                byte[] processUnCompress = processUnCompress(bArr, str3);
                if (processUnCompress == null || processUnCompress.length == 0) {
                    throw new Exception("Uncompressed data error! compress type:" + str3 + ";attr:" + str2 + " , connection info:" + str);
                }
                bArr = processUnCompress;
            }
            String str4 = (String) hashMap.get("groupId");
            String str5 = (String) hashMap.get("streamId");
            hashMap.put("dt", String.valueOf(NumberUtils.toLong((String) hashMap.get("dt"), j)));
            if (StringUtils.isBlank((CharSequence) hashMap.get("rtms"))) {
                hashMap.put("rtms", String.valueOf(j));
            }
            hashMap.put("rt", String.valueOf(j));
            int i2 = NumberUtils.toInt((String) hashMap.get("cnt"), 1);
            hashMap.put("cnt", String.valueOf(i2));
            int i3 = 0;
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            if (MsgType.MSG_MULTI_BODY.equals(msgType)) {
                arrayList = new ArrayList(i2);
                while (wrap.remaining() > 0) {
                    int i4 = wrap.getInt();
                    if (i4 <= 0 || i4 > wrap.remaining()) {
                        throw new Exception("[Malformed Data]Invalid data len! channel is " + str);
                    }
                    byte[] bArr3 = new byte[i4];
                    wrap.get(bArr3);
                    arrayList.add(new ProxyMessage(str4, str5, hashMap, bArr3));
                    i3++;
                }
            } else {
                arrayList = new ArrayList(1);
                arrayList.add(new ProxyMessage(str4, str5, hashMap, bArr));
                i3 = 0 + 1;
            }
            if (i3 != i2) {
                hashMap.put("cnt", String.valueOf(i3));
            }
            map.put(ConfigConstants.MSG_LIST, arrayList);
            return map;
        } catch (Exception e) {
            throw new Exception("Parse commonAttrMap error.commonAttrString is: " + str2 + " , connection info:" + str);
        }
    }

    private byte[] processUnCompress(byte[] bArr, String str) {
        try {
            byte[] bArr2 = new byte[Snappy.uncompressedLength(bArr, 0, bArr.length)];
            Snappy.uncompress(bArr, 0, bArr.length, bArr2, 0);
            return bArr2;
        } catch (IOException e) {
            LOG.error("Uncompressed data error: ", e);
            return null;
        }
    }

    @Override // org.apache.inlong.dataproxy.source.ServiceDecoder
    public Map<String, Object> extractData(ByteBuf byteBuf, String str, long j, Channel channel) throws Exception {
        Map<String, Object> hashMap = new HashMap<>();
        if (null == byteBuf) {
            LOG.error("cb == null");
            return hashMap;
        }
        int readableBytes = byteBuf.readableBytes();
        if (20971520 < readableBytes) {
            throw new Exception("Error msg, ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen, and  totalLen=" + readableBytes);
        }
        byteBuf.markReaderIndex();
        int readInt = byteBuf.readInt();
        if (readInt + 4 > readableBytes) {
            byteBuf.resetReaderIndex();
            return null;
        }
        byte readByte = byteBuf.readByte();
        int i = (readByte & 224) >> 5;
        MsgType valueOf = MsgType.valueOf(readByte);
        hashMap.put(ConfigConstants.MSG_TYPE, valueOf);
        if (MsgType.MSG_HEARTBEAT.equals(valueOf) || MsgType.MSG_UNKNOWN.equals(valueOf)) {
            return hashMap;
        }
        if (MsgType.MSG_BIN_HEARTBEAT.equals(valueOf)) {
            return extractNewBinHB(hashMap, byteBuf, channel, readInt);
        }
        if (valueOf.getValue() < MsgType.MSG_BIN_MULTI_BODY.getValue()) {
            return extractDefaultData(hashMap, byteBuf, readInt, valueOf, str, j);
        }
        hashMap.put(ConfigConstants.COMPRESS_TYPE, i != 0 ? "snappy" : "");
        return extractNewBinData(hashMap, byteBuf, channel, readInt, valueOf, str, j);
    }
}
