package org.apache.inlong.dataproxy.utils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flume.Event;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source.MsgType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/utils/MessageUtils.class */
public class MessageUtils {
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final Logger logger = LoggerFactory.getLogger(MessageUtils.class);

    public static boolean isSyncSendForOrder(Event event) {
        String str = (String) event.getHeaders().get("syncSend");
        return StringUtils.isNotEmpty(str) && "true".equalsIgnoreCase(str);
    }

    public static boolean isSinkRspType(Event event) {
        return isSinkRspType((Map<String, String>) event.getHeaders());
    }

    public static boolean isSinkRspType(Map<String, String> map) {
        String str = map.get("proxySend");
        String str2 = map.get("syncSend");
        return (StringUtils.isNotEmpty(str) && "true".equalsIgnoreCase(str)) || (StringUtils.isNotEmpty(str2) && "true".equalsIgnoreCase(str2));
    }

    public static Pair<Boolean, String> getEventProcType(Event event) {
        return getEventProcType((String) event.getHeaders().get("syncSend"), (String) event.getHeaders().get("proxySend"));
    }

    public static Pair<Boolean, String> getEventProcType(String str, String str2) {
        boolean z = false;
        Object obj = "b2b";
        if (StringUtils.isNotEmpty(str) && "true".equalsIgnoreCase(str)) {
            z = true;
            obj = "order";
        }
        if (StringUtils.isNotEmpty(str2) && "true".equalsIgnoreCase(str2)) {
            z = true;
            obj = "proxy";
        }
        return Pair.of(Boolean.valueOf(z), obj);
    }

    public static void sourceReturnRspPackage(Map<String, String> map, Map<String, Object> map2, Channel channel, MsgType msgType) throws Exception {
        ByteBuf buildBinMsgRspPackage;
        String str = null;
        StringBuilder sb = new StringBuilder(512);
        if (channel == null || MsgType.MSG_UNKNOWN.equals(msgType)) {
            if (logCounter.shouldPrint()) {
                if (channel == null) {
                    logger.warn("remoteChannel == null, discard it!", channel);
                    return;
                } else {
                    logger.warn("Unknown msgType message from {}, discard it!", channel);
                    return;
                }
            }
            return;
        }
        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
            buildBinMsgRspPackage = buildHeartBeatMsgRspPackage();
        } else {
            if ("false".equalsIgnoreCase(map.get("isAck"))) {
                return;
            }
            str = (String) map2.get(ConfigConstants.DECODER_ATTRS);
            if (!channel.isWritable()) {
                sb.append("Send buffer is full1 by channel ").append(channel).append(", attr is ").append(str);
                if (logCounter.shouldPrint()) {
                    logger.warn(sb.toString());
                }
                throw new Exception(sb.toString());
            }
            sb.append(ConfigConstants.DATAPROXY_IP_KEY).append(AttrConstants.KEY_VALUE_SEPARATOR).append(NetworkUtils.getLocalIp());
            String str2 = map.get("errCode");
            if (StringUtils.isNotEmpty(str2)) {
                sb.append(AttrConstants.SEPARATOR).append("errCode").append(AttrConstants.KEY_VALUE_SEPARATOR).append(str2);
                String str3 = map.get("errMsg");
                if (StringUtils.isNotEmpty(str3)) {
                    sb.append(AttrConstants.SEPARATOR).append("errMsg").append(AttrConstants.KEY_VALUE_SEPARATOR).append(str3);
                }
            }
            if (StringUtils.isNotEmpty(str)) {
                sb.append(AttrConstants.SEPARATOR).append(str);
            }
            String sb2 = sb.toString();
            buildBinMsgRspPackage = MsgType.MSG_BIN_MULTI_BODY.equals(msgType) ? buildBinMsgRspPackage(sb2, map.get("uniq")) : MsgType.MSG_BIN_HEARTBEAT.equals(msgType) ? buildHBRspPackage(sb2, ((Byte) map2.get(ConfigConstants.VERSION_TYPE)).byteValue(), 0) : buildDefMsgRspPackage(msgType, sb2);
        }
        if (channel.isWritable()) {
            channel.writeAndFlush(buildBinMsgRspPackage);
            return;
        }
        buildBinMsgRspPackage.release();
        sb.delete(0, sb.length());
        sb.append("Send buffer is full2 by channel ").append(channel).append(", attr is ").append(str);
        if (logCounter.shouldPrint()) {
            logger.warn(sb.toString());
        }
        throw new Exception(sb.toString());
    }

    public static void sinkReturnRspPackage(SinkRspEvent sinkRspEvent, DataProxyErrCode dataProxyErrCode, String str) {
        StringBuilder sb = new StringBuilder(512);
        ChannelHandlerContext ctx = sinkRspEvent.getCtx();
        if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) {
            return;
        }
        Channel channel = ctx.channel();
        MsgType msgType = sinkRspEvent.getMsgType();
        if (MsgType.MSG_UNKNOWN.equals(msgType) || MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_HEARTBEAT.equals(msgType) || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
            return;
        }
        Map<String, String> headers = sinkRspEvent.getHeaders();
        if ("false".equalsIgnoreCase(headers.get("isAck"))) {
            return;
        }
        String str2 = headers.get(ConfigConstants.DECODER_ATTRS);
        if (!channel.isWritable()) {
            if (logCounter.shouldPrint()) {
                logger.warn(sb.append("Send buffer is full3 by channel ").append(channel).append(", attr is ").append(str2).toString());
                return;
            }
            return;
        }
        sb.append(ConfigConstants.DATAPROXY_IP_KEY).append(AttrConstants.KEY_VALUE_SEPARATOR).append(NetworkUtils.getLocalIp());
        if (DataProxyErrCode.SUCCESS != dataProxyErrCode) {
            sb.append(AttrConstants.SEPARATOR).append("errCode").append(AttrConstants.KEY_VALUE_SEPARATOR).append(dataProxyErrCode.getErrCode());
            if (StringUtils.isNotEmpty(str)) {
                sb.append(AttrConstants.SEPARATOR).append("errMsg").append(AttrConstants.KEY_VALUE_SEPARATOR).append(str);
            }
        }
        if (StringUtils.isNotEmpty(str2)) {
            sb.append(AttrConstants.SEPARATOR).append(str2);
        }
        String sb2 = sb.toString();
        ByteBuf buildBinMsgRspPackage = MsgType.MSG_BIN_MULTI_BODY.equals(msgType) ? buildBinMsgRspPackage(sb2, headers.get("uniq")) : buildDefMsgRspPackage(msgType, sb2);
        try {
            if (channel.isWritable()) {
                channel.writeAndFlush(buildBinMsgRspPackage);
            } else {
                buildBinMsgRspPackage.release();
                if (logCounter.shouldPrint()) {
                    sb.delete(0, sb.length());
                    logger.warn(sb.append("Send buffer is full4 by channel ").append(channel).append(", attr is ").append(str2).toString());
                }
            }
        } catch (Throwable th) {
            if (logCounter.shouldPrint()) {
                sb.delete(0, sb.length());
                logger.error(sb.append("Write data to channel exception, channel=").append(channel).append(", attr is ").append(str2).toString());
            }
        }
    }

    private static ByteBuf buildHeartBeatMsgRspPackage() {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
        return buffer;
    }

    private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String str) {
        int i = 0;
        if (str != null) {
            i = str.length();
        }
        int i2 = 5 + 0 + 4 + i;
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i2);
        buffer.writeInt(i2);
        buffer.writeByte(msgType.getValue());
        buffer.writeInt(0);
        buffer.writeInt(i);
        if (i > 0) {
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        }
        return buffer;
    }

    private static ByteBuf buildBinMsgRspPackage(String str, String str2) {
        int i = 9;
        if (null != str) {
            i = 9 + str.length();
        }
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i);
        buffer.writeInt(i);
        buffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
        long parseLong = Long.parseLong(str2);
        buffer.writeBytes(new byte[]{(byte) ((parseLong >> 24) & 255), (byte) ((parseLong >> 16) & 255), (byte) ((parseLong >> 8) & 255), (byte) (parseLong & 255)});
        if (null != str) {
            buffer.writeShort(str.length());
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        } else {
            buffer.writeShort(0);
        }
        buffer.writeShort(60929);
        return buffer;
    }

    private static ByteBuf buildHBRspPackage(String str, byte b, int i) {
        int i2 = 16;
        if (null != str) {
            i2 = 16 + str.length();
        }
        if (i == 0 || i == -1) {
            i = 65535;
        }
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i2);
        buffer.writeInt(i2);
        buffer.writeByte(MsgType.MSG_BIN_HEARTBEAT.getValue());
        buffer.writeInt((int) (System.currentTimeMillis() / 1000));
        buffer.writeByte(b);
        buffer.writeInt(2);
        buffer.writeShort(i);
        if (null != str) {
            buffer.writeShort(str.length());
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        } else {
            buffer.writeShort(0);
        }
        buffer.writeShort(60929);
        return buffer;
    }

    public static String getTopic(Map<String, String> map, String str, String str2) {
        String str3 = null;
        if (map != null && StringUtils.isNotEmpty(str)) {
            if (StringUtils.isNotEmpty(str2)) {
                str3 = map.get(str + "/" + str2);
            }
            if (StringUtils.isEmpty(str3)) {
                str3 = map.get(str);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}", new Object[]{str, str2, str3});
        }
        return str3;
    }

    public static Map<String, String> getXfsAttrs(Map<String, String> map, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(ConfigConstants.MSG_ENCODE_VER, str);
        if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(str)) {
            hashMap.put("dataproxyip", NetworkUtils.getLocalIp());
            hashMap.put("inlongGroupId", map.get("inlongGroupId"));
            hashMap.put("inlongStreamId", map.get("inlongStreamId"));
            hashMap.put("topic", map.get("topic"));
            hashMap.put("msgTime", map.get("msgTime"));
            hashMap.put(Constants.HEADER_KEY_SOURCE_IP, map.get(Constants.HEADER_KEY_SOURCE_IP));
        } else {
            hashMap.put("inlongGroupId", map.get("groupId"));
            hashMap.put("inlongStreamId", map.get("streamId"));
            hashMap.put("topic", map.get("topic"));
            hashMap.put("msgTime", map.get("dt"));
            hashMap.put(Constants.HEADER_KEY_SOURCE_IP, map.get(ConfigConstants.REMOTE_IP_KEY));
            hashMap.put(Constants.HEADER_KEY_SOURCE_TIME, map.get("rt"));
            hashMap.put(ConfigConstants.DATAPROXY_IP_KEY, NetworkUtils.getLocalIp());
        }
        return hashMap;
    }
}
