package org.apache.inlong.sdk.sort.impl.decode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.Deserializer;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.inlong.sdk.sort.util.Utils;

/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.class */
public class MessageDeserializer implements Deserializer {
    private static final int MESSAGE_VERSION_NONE = 0;
    private static final int MESSAGE_VERSION_PB = 1;
    private static final int MESSAGE_VERSION_INLONG_MSG = 2;
    private static final int COMPRESS_TYPE_NONE = 0;
    private static final int COMPRESS_TYPE_GZIP = 1;
    private static final int COMPRESS_TYPE_SNAPPY = 2;
    private static final String VERSION_KEY = "version";
    private static final String COMPRESS_TYPE_KEY = "compressType";
    private static final String MSG_TIME_KEY = "msgTime";
    private static final String SOURCE_IP_KEY = "sourceIp";
    private static final String INLONG_GROUPID_KEY = "inlongGroupId";
    private static final String INLONG_STREAMID_KEY = "inlongStreamId";
    private static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
    private static final String INLONGMSG_ATTR_GROUP_ID = "groupId";
    private static final String INLONGMSG_ATTR_TIME_T = "t";
    private static final String INLONGMSG_ATTR_TIME_DT = "dt";
    private static final String INLONGMSG_ATTR_NODE_IP = "NodeIP";
    private static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&';
    private static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
    private static final String DEFAULT_IP = "127.0.0.1";
    private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!";

    @Override // org.apache.inlong.sdk.sort.api.Deserializer
    public List<InLongMessage> deserialize(ClientContext clientContext, InLongTopic inLongTopic, Map<String, String> map, byte[] bArr) throws Exception {
        int parseInt = Integer.parseInt(map.getOrDefault(VERSION_KEY, Integer.toString(2)));
        switch (parseInt) {
            case 0:
                return decode(clientContext, inLongTopic, bArr, map);
            case 1:
                return decodePB(clientContext, inLongTopic, bArr, map);
            case 2:
                return decodeInlongMsg(clientContext, inLongTopic, bArr, map);
            default:
                throw new IllegalArgumentException("Unknown version type:" + parseInt);
        }
    }

    private List<InLongMessage> decode(ClientContext clientContext, InLongTopic inLongTopic, byte[] bArr, Map<String, String> map) {
        return Collections.singletonList(new InLongMessage(map.getOrDefault(INLONG_GROUPID_KEY, ""), map.getOrDefault(INLONG_STREAMID_KEY, ""), Long.parseLong(map.getOrDefault(MSG_TIME_KEY, "0")), map.getOrDefault(SOURCE_IP_KEY, ""), bArr, map));
    }

    private List<InLongMessage> decodePB(ClientContext clientContext, InLongTopic inLongTopic, byte[] bArr, Map<String, String> map) throws IOException {
        int parseInt = Integer.parseInt(map.getOrDefault(COMPRESS_TYPE_KEY, "0"));
        String orDefault = map.getOrDefault(INLONG_GROUPID_KEY, "");
        String orDefault2 = map.getOrDefault(INLONG_STREAMID_KEY, "");
        switch (parseInt) {
            case 0:
                return transformMessageObjs(clientContext, inLongTopic, ProxySdk.MessageObjs.parseFrom(bArr), orDefault, orDefault2);
            case 1:
                return transformMessageObjs(clientContext, inLongTopic, ProxySdk.MessageObjs.parseFrom(Utils.gzipDecompress(bArr, 0, bArr.length)), orDefault, orDefault2);
            case 2:
                return transformMessageObjs(clientContext, inLongTopic, ProxySdk.MessageObjs.parseFrom(Utils.snappyDecompress(bArr, 0, bArr.length)), orDefault, orDefault2);
            default:
                throw new IllegalArgumentException("Unknown compress type:" + parseInt);
        }
    }

    private List<InLongMessage> transformMessageObjs(ClientContext clientContext, InLongTopic inLongTopic, ProxySdk.MessageObjs messageObjs, String str, String str2) {
        if (null == messageObjs) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (ProxySdk.MessageObj messageObj : messageObjs.getMsgsList()) {
            List<ProxySdk.MapFieldEntry> paramsList = messageObj.getParamsList();
            HashMap hashMap = new HashMap();
            for (ProxySdk.MapFieldEntry mapFieldEntry : paramsList) {
                hashMap.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
            }
            arrayList.add(new InLongMessage(str, str2, messageObj.getMsgTime(), messageObj.getSourceIp(), messageObj.getBody().toByteArray(), hashMap));
        }
        return arrayList;
    }

    private List<InLongMessage> decodeInlongMsg(ClientContext clientContext, InLongTopic inLongTopic, byte[] bArr, Map<String, String> map) {
        long parseLong;
        ArrayList arrayList = new ArrayList();
        InLongMsg parseFrom = InLongMsg.parseFrom(bArr);
        for (String str : parseFrom.getAttrs()) {
            Map<String, String> splitKv = StringUtil.splitKv(str, '&', '=', null, null);
            String str2 = (String) Optional.ofNullable(splitKv.get(INLONGMSG_ATTR_GROUP_ID)).orElseThrow(() -> {
                return new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, INLONGMSG_ATTR_GROUP_ID));
            });
            String str3 = (String) Optional.ofNullable(splitKv.get(INLONGMSG_ATTR_STREAM_ID)).orElseThrow(() -> {
                return new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, INLONGMSG_ATTR_STREAM_ID));
            });
            if (splitKv.containsKey(INLONGMSG_ATTR_TIME_T)) {
                parseLong = StringUtil.parseDateTime(splitKv.get(INLONGMSG_ATTR_TIME_T).trim());
            } else {
                if (!splitKv.containsKey(INLONGMSG_ATTR_TIME_DT)) {
                    throw new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, "t or dt"));
                }
                parseLong = Long.parseLong(splitKv.get(INLONGMSG_ATTR_TIME_DT).trim());
            }
            String str4 = (String) Optional.ofNullable(splitKv.get(INLONGMSG_ATTR_NODE_IP)).orElse(DEFAULT_IP);
            Iterator iterator = parseFrom.getIterator(str);
            while (iterator.hasNext()) {
                byte[] bArr2 = (byte[]) iterator.next();
                if (!Objects.isNull(bArr2)) {
                    arrayList.add(new InLongMessage(str2, str3, parseLong, str4, bArr2, splitKv));
                }
            }
        }
        return arrayList;
    }
}
