package org.apache.inlong.dataproxy.source;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.exception.ChannelUnWritableException;
import org.apache.inlong.dataproxy.exception.PkgParseException;
import org.apache.inlong.dataproxy.source.v0msg.AbsV0MsgCodec;
import org.apache.inlong.dataproxy.source.v0msg.CodecBinMsg;
import org.apache.inlong.dataproxy.source.v0msg.CodecTextMsg;
import org.apache.inlong.dataproxy.source.v0msg.MsgFieldConsts;
import org.apache.inlong.dataproxy.source.v1msg.InlongTcpSourceCallback;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/ServerMessageHandler.class */
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
    private static final int INLONG_MSG_V1 = 1;
    private final BaseSource source;
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20000);
    private static final ConfigManager configManager = ConfigManager.getInstance();

    public ServerMessageHandler(BaseSource baseSource) {
        this.source = baseSource;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        AbsV0MsgCodec codecTextMsg;
        if (obj == null) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY);
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            int readableBytes = byteBuf.readableBytes();
            if (readableBytes == 0 && this.source.isFilterEmptyMsg()) {
                byteBuf.clear();
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY);
                byteBuf.release();
                return;
            }
            if (readableBytes > this.source.getMaxMsgLength()) {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_OVERMAX);
                throw new PkgParseException("Error msg, readableLength(" + readableBytes + ") > max allowed message length (" + this.source.getMaxMsgLength() + ")");
            }
            byteBuf.markReaderIndex();
            int readInt = byteBuf.readInt();
            if (readableBytes < readInt + 4) {
                byteBuf.resetReaderIndex();
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_UNFILLED);
                byteBuf.release();
                return;
            }
            byte readByte = byteBuf.readByte();
            if (readByte == 0) {
                byte readByte2 = byteBuf.readByte();
                if (readByte2 != 1) {
                    this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V1_INVALID);
                    throw new PkgParseException("Unknown V1 message version, version = " + ((int) readByte2));
                }
                processV1Msg(channelHandlerContext, byteBuf, readInt - 2);
            } else {
                Channel channel = channelHandlerContext.channel();
                MsgType valueOf = MsgType.valueOf(readByte);
                long currentTimeMillis = System.currentTimeMillis();
                if (MsgType.MSG_UNKNOWN == valueOf) {
                    this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V0_INVALID);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received unknown message, channel {}", channel);
                    }
                    throw new PkgParseException("Unknown V0 message type, type = " + ((int) readByte));
                }
                if (MsgType.MSG_HEARTBEAT == valueOf) {
                    flushV0MsgPackage(this.source, channel, buildHeartBeatMsgRspPackage(), MsgType.MSG_HEARTBEAT.name());
                    byteBuf.release();
                    return;
                }
                if (MsgType.MSG_BIN_HEARTBEAT == valueOf) {
                    procBinHeartbeatMsg(this.source, channel, byteBuf, readInt);
                    byteBuf.release();
                    return;
                }
                String channelRemoteIP = AddressUtils.getChannelRemoteIP(channel);
                if (MsgType.MSG_BIN_MULTI_BODY == valueOf) {
                    if (readInt < 25) {
                        this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BIN_TOTALLEN_BELOWMIN);
                        String format = String.format("Malformed msg, totalDataLen(%d) < min bin7-msg length(%d)", Integer.valueOf(readInt), 25);
                        if (logger.isDebugEnabled()) {
                            logger.debug(format + ", channel {}", channel);
                        }
                        throw new PkgParseException(format);
                    }
                    codecTextMsg = new CodecBinMsg(readInt, readByte, currentTimeMillis, channelRemoteIP);
                } else {
                    if (readInt < 9) {
                        this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_TOTALLEN_BELOWMIN);
                        String format2 = String.format("Malformed msg, totalDataLen(%d) < min txt-msg length(%d)", Integer.valueOf(readInt), 9);
                        if (logger.isDebugEnabled()) {
                            logger.debug(format2 + ", channel {}", channel);
                        }
                        throw new PkgParseException(format2);
                    }
                    codecTextMsg = new CodecTextMsg(readInt, readByte, currentTimeMillis, channelRemoteIP);
                }
                processV0Msg(channel, byteBuf, codecTextMsg);
            }
        } finally {
            byteBuf.release();
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        String channelRemoteIP;
        if (ConfigManager.getInstance().needChkIllegalIP() && (channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel())) != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.error(channelRemoteIP + " is Illegal IP, so refuse it !");
            return;
        }
        if (this.source.getAllChannels().size() >= this.source.getMaxConnections()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.source.getCachedSrcName(), channelHandlerContext.channel(), Integer.valueOf(this.source.getAllChannels().size()), Integer.valueOf(this.source.getMaxConnections())});
            return;
        }
        this.source.getAllChannels().add(channelHandlerContext.channel());
        channelHandlerContext.fireChannelActive();
        this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
        logger.info("{} added new channel {}, current connections = {}, maxConnections = {}", new Object[]{this.source.getCachedSrcName(), channelHandlerContext.channel(), Integer.valueOf(this.source.getAllChannels().size()), Integer.valueOf(this.source.getMaxConnections())});
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
        channelHandlerContext.fireChannelInactive();
        this.source.getAllChannels().remove(channelHandlerContext.channel());
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (!(th instanceof PkgParseException) && !(th instanceof ChannelUnWritableException)) {
            if (th instanceof ReadTimeoutException) {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_READ_TIMEOUT);
            } else if (th instanceof TooLongFrameException) {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_OVERMAX);
            } else if (th instanceof CorruptedFrameException) {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_CORRPUTED);
            } else if (th instanceof IOException) {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_IO_EXCEPTION);
            } else {
                this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNKNOWN_EXCEPTION);
            }
            if (exceptLogCounter.shouldPrint()) {
                logger.warn("{} received an exception from channel {}", new Object[]{this.source.getCachedSrcName(), channelHandlerContext.channel(), th});
            }
        }
        if (channelHandlerContext.channel() != null) {
            this.source.getAllChannels().remove(channelHandlerContext.channel());
            try {
                channelHandlerContext.channel().disconnect();
                channelHandlerContext.channel().close();
            } catch (Exception e) {
            }
        }
        channelHandlerContext.close();
    }

    private void processV0Msg(Channel channel, ByteBuf byteBuf, AbsV0MsgCodec absV0MsgCodec) throws Exception {
        StringBuilder sb = new StringBuilder(512);
        if (!absV0MsgCodec.descMsg(this.source, byteBuf)) {
            responseV0Msg(channel, absV0MsgCodec, sb);
            return;
        }
        if (this.source.isRejectService()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
            absV0MsgCodec.setFailureInfo(DataProxyErrCode.SERVICE_CLOSED);
            responseV0Msg(channel, absV0MsgCodec, sb);
            return;
        }
        if (!ConfigManager.getInstance().isMqClusterReady()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
            absV0MsgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
            responseV0Msg(channel, absV0MsgCodec, sb);
            return;
        }
        if (!absV0MsgCodec.validAndFillFields(this.source, sb)) {
            responseV0Msg(channel, absV0MsgCodec, sb);
            return;
        }
        Event encEventPackage = absV0MsgCodec.encEventPackage(this.source, channel);
        try {
            this.source.getCachedChProcessor().processEvent(encEventPackage);
            this.source.fileMetricAddSuccStats(sb, absV0MsgCodec.getGroupId(), absV0MsgCodec.getStreamId(), absV0MsgCodec.getTopicName(), absV0MsgCodec.getStrRemoteIP(), absV0MsgCodec.getMsgProcType(), absV0MsgCodec.getDataTimeMs(), absV0MsgCodec.getMsgPkgTime(), absV0MsgCodec.getMsgCount(), 1, encEventPackage.getBody().length);
            this.source.addMetric(true, encEventPackage.getBody().length, encEventPackage);
            if (absV0MsgCodec.isNeedResp() && !absV0MsgCodec.isOrderOrProxy()) {
                absV0MsgCodec.setSuccessInfo();
                responseV0Msg(channel, absV0MsgCodec, sb);
            }
        } catch (Throwable th) {
            this.source.fileMetricAddFailStats(sb, absV0MsgCodec.getGroupId(), absV0MsgCodec.getStreamId(), absV0MsgCodec.getTopicName(), absV0MsgCodec.getStrRemoteIP(), absV0MsgCodec.getMsgProcType(), absV0MsgCodec.getDataTimeMs(), absV0MsgCodec.getMsgPkgTime(), 1);
            this.source.addMetric(false, encEventPackage.getBody().length, encEventPackage);
            if (absV0MsgCodec.isNeedResp()) {
                absV0MsgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE, sb.append("Put msg event to channel failure: ").append(th.getMessage()).toString());
                sb.delete(0, sb.length());
                responseV0Msg(channel, absV0MsgCodec, sb);
            }
            if (logCounter.shouldPrint()) {
                logger.error("Error writing msg event to channel failure, attrs={}", absV0MsgCodec.getAttr(), th);
            }
        }
    }

    private void processV1Msg(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, int i) throws Exception {
        byte[] bArr = new byte[i];
        byteBuf.readBytes(bArr);
        ProxySdk.MessagePack parseFrom = ProxySdk.MessagePack.parseFrom(bArr);
        if (this.source.isRejectService()) {
            this.source.addMetric(false, 0L, null);
            this.source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, parseFrom);
            return;
        }
        List<ProxyEvent> decodeSdkPack = EventUtils.decodeSdkPack(parseFrom);
        if (decodeSdkPack.size() == 0) {
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.SUCCUSS, parseFrom);
        }
        if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
            processAndWaitingSave(channelHandlerContext, parseFrom, decodeSdkPack);
        } else {
            processAndResponse(channelHandlerContext, parseFrom, decodeSdkPack);
        }
    }

    private void responsePackage(ChannelHandlerContext channelHandlerContext, ProxySdk.ResultCode resultCode, ProxySdk.MessagePack messagePack) throws Exception {
        ProxySdk.ResponseInfo.Builder newBuilder = ProxySdk.ResponseInfo.newBuilder();
        newBuilder.setResult(resultCode);
        newBuilder.setPackId(messagePack.getHeader().getPackId());
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(newBuilder.build().toByteArray());
        Channel channel = channelHandlerContext.channel();
        if (channel.isWritable()) {
            channel.write(wrappedBuffer);
        } else {
            wrappedBuffer.release();
            logger.warn("Send buffer2 is not writable, disconnect {}", channel);
            throw new ChannelUnWritableException("Send buffer2 is not writable, disconnect " + channel);
        }
    }

    private void processAndWaitingSave(ChannelHandlerContext channelHandlerContext, ProxySdk.MessagePack messagePack, List<ProxyEvent> list) throws Exception {
        ProxySdk.MessagePackHeader header = messagePack.getHeader();
        InlongTcpSourceCallback inlongTcpSourceCallback = new InlongTcpSourceCallback(channelHandlerContext, header);
        try {
            this.source.getCachedChProcessor().processEvent(new ProxyPackEvent(header.getInlongGroupId(), header.getInlongStreamId(), list, inlongTcpSourceCallback));
            list.forEach(proxyEvent -> {
                this.source.addMetric(true, proxyEvent.getBody().length, proxyEvent);
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
            });
            if (!inlongTcpSourceCallback.getLatch().await(CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), TimeUnit.MILLISECONDS) && !inlongTcpSourceCallback.getHasResponsed().getAndSet(true)) {
                responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
            }
        } catch (Throwable th) {
            logger.error("Process Controller Event error can't write event to channel.", th);
            list.forEach(proxyEvent2 -> {
                this.source.addMetric(false, proxyEvent2.getBody().length, proxyEvent2);
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
            });
            if (inlongTcpSourceCallback.getHasResponsed().getAndSet(true)) {
                return;
            }
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
        }
    }

    private void processAndResponse(ChannelHandlerContext channelHandlerContext, ProxySdk.MessagePack messagePack, List<ProxyEvent> list) throws Exception {
        Iterator<ProxyEvent> it = list.iterator();
        while (it.hasNext()) {
            Event event = (ProxyEvent) it.next();
            String topicName = configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
            if (StringUtils.isEmpty(topicName)) {
                this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, event.getInlongGroupId());
                this.source.addMetric(false, event.getBody().length, event);
                responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_ID_ERROR, messagePack);
                return;
            }
            event.setTopic(topicName);
            try {
                this.source.getCachedChProcessor().processEvent(event);
                this.source.addMetric(true, event.getBody().length, event);
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
            } catch (Throwable th) {
                logger.error("Process Controller Event error can't write event to channel.", th);
                this.source.addMetric(false, event.getBody().length, event);
                responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
                this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
                return;
            }
        }
        responsePackage(channelHandlerContext, ProxySdk.ResultCode.SUCCUSS, messagePack);
    }

    private void responseV0Msg(Channel channel, AbsV0MsgCodec absV0MsgCodec, StringBuilder sb) throws Exception {
        if (channel == null || !channel.isWritable()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE);
            if (logCounter.shouldPrint()) {
                logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}", new Object[]{Byte.valueOf(absV0MsgCodec.getMsgType()), absV0MsgCodec.getAttr(), channel});
            }
            throw new ChannelUnWritableException("Prepare send msg but channel full");
        }
        if (absV0MsgCodec.isNeedResp()) {
            sb.append("dpIp").append(AttrConstants.KEY_VALUE_SEPARATOR).append(this.source.getSrcHost());
            if (absV0MsgCodec.getErrCode() != DataProxyErrCode.SUCCESS) {
                sb.append(AttrConstants.SEPARATOR).append("errCode").append(AttrConstants.KEY_VALUE_SEPARATOR).append(absV0MsgCodec.getErrCode().getErrCodeStr());
                if (StringUtils.isNotEmpty(absV0MsgCodec.getErrMsg())) {
                    sb.append(AttrConstants.SEPARATOR).append("errMsg").append(AttrConstants.KEY_VALUE_SEPARATOR).append(absV0MsgCodec.getErrMsg());
                }
            }
            if (StringUtils.isNotEmpty(absV0MsgCodec.getAttr())) {
                sb.append(AttrConstants.SEPARATOR).append(absV0MsgCodec.getAttr());
            }
            MsgType valueOf = MsgType.valueOf(absV0MsgCodec.getMsgType());
            ByteBuf buildBinMsgRspPackage = MsgType.MSG_BIN_MULTI_BODY.equals(valueOf) ? buildBinMsgRspPackage(sb.toString(), absV0MsgCodec.getUniq()) : buildTxtMsgRspPackage(valueOf, sb.toString(), absV0MsgCodec);
            sb.delete(0, sb.length());
            flushV0MsgPackage(this.source, channel, buildBinMsgRspPackage, absV0MsgCodec.getAttr());
        }
    }

    private void procBinHeartbeatMsg(BaseSource baseSource, Channel channel, ByteBuf byteBuf, int i) throws Exception {
        if (i < 14) {
            baseSource.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_TOTALLEN_BELOWMIN);
            String format = String.format("Malformed msg, totalDataLen(%d) < min hb-msg length(%d)", Integer.valueOf(i), 14);
            if (logger.isDebugEnabled()) {
                logger.debug(format + ", channel {}", channel);
            }
            throw new PkgParseException(format);
        }
        int readerIndex = byteBuf.readerIndex() - 5;
        int i2 = byteBuf.getInt(readerIndex + 10);
        int i3 = byteBuf.getShort(readerIndex + 14 + i2);
        int unsignedShort = byteBuf.getUnsignedShort(readerIndex + 14 + i2 + 2 + i3);
        if (unsignedShort != 60929) {
            baseSource.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_MAGIC_UNEQUAL);
            String format2 = String.format("Malformed msg, msgMagic(%d) != %d", Integer.valueOf(unsignedShort), Integer.valueOf(MsgFieldConsts.BIN_MSG_MAGIC));
            if (logger.isDebugEnabled()) {
                logger.debug(format2 + ", channel {}", channel);
            }
            throw new PkgParseException(format2);
        }
        if (i + 4 < i2 + i3 + 18) {
            baseSource.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_LEN_MALFORMED);
            String format3 = String.format("Malformed msg, bodyLen(%d) + attrLen(%d) > totalDataLen(%d)", Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i));
            if (logger.isDebugEnabled()) {
                logger.debug(format3 + ", channel {}", channel);
            }
            throw new PkgParseException(format3);
        }
        byte b = byteBuf.getByte(readerIndex + 9);
        byte[] bArr = null;
        if (i3 > 0) {
            bArr = new byte[i3];
            byteBuf.getBytes(readerIndex + 14 + i2 + 2, bArr, 0, i3);
        }
        flushV0MsgPackage(baseSource, channel, buildHBRspPackage(bArr, b, 0), MsgType.MSG_BIN_HEARTBEAT.name());
    }

    public static ByteBuf buildBinMsgRspPackage(String str, long j) {
        int i = 9;
        if (null != str) {
            i = 9 + str.length();
        }
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i);
        buffer.writeInt(i);
        buffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
        buffer.writeBytes(new byte[]{(byte) ((j >> 24) & 255), (byte) ((j >> 16) & 255), (byte) ((j >> 8) & 255), (byte) (j & 255)});
        if (null != str) {
            buffer.writeShort(str.length());
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        } else {
            buffer.writeShort(0);
        }
        buffer.writeShort(MsgFieldConsts.BIN_MSG_MAGIC);
        return buffer;
    }

    public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String str) {
        int i = 0;
        if (str != null) {
            i = str.length();
        }
        int i2 = 5 + 0 + 4 + i;
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i2);
        buffer.writeInt(i2);
        buffer.writeByte(msgType.getValue());
        buffer.writeInt(0);
        buffer.writeInt(i);
        if (i > 0) {
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        }
        return buffer;
    }

    private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String str, AbsV0MsgCodec absV0MsgCodec) {
        int i = 0;
        int i2 = 0;
        byte[] bArr = null;
        if (str != null) {
            i = str.length();
        }
        if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
            bArr = absV0MsgCodec.getOrigBody();
            if (bArr != null) {
                i2 = bArr.length;
            }
        }
        int i3 = 5 + i2 + 4 + i;
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i3);
        buffer.writeInt(i3);
        buffer.writeByte(msgType.getValue());
        buffer.writeInt(i2);
        if (i2 > 0) {
            buffer.writeBytes(bArr);
        }
        buffer.writeInt(i);
        if (i > 0) {
            buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
        }
        return buffer;
    }

    private ByteBuf buildHBRspPackage(byte[] bArr, byte b, int i) {
        int i2 = 16;
        if (null != bArr) {
            i2 = 16 + bArr.length;
        }
        if (i == 0 || i == -1) {
            i = 65535;
        }
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + i2);
        buffer.writeInt(i2);
        buffer.writeByte(MsgType.MSG_BIN_HEARTBEAT.getValue());
        buffer.writeInt((int) (System.currentTimeMillis() / 1000));
        buffer.writeByte(b);
        buffer.writeInt(2);
        buffer.writeShort(i);
        if (null != bArr) {
            buffer.writeShort(bArr.length);
            buffer.writeBytes(bArr);
        } else {
            buffer.writeShort(0);
        }
        buffer.writeShort(MsgFieldConsts.BIN_MSG_MAGIC);
        return buffer;
    }

    private ByteBuf buildHeartBeatMsgRspPackage() {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
        return buffer;
    }

    private void flushV0MsgPackage(BaseSource baseSource, Channel channel, ByteBuf byteBuf, String str) throws Exception {
        if (channel != null && channel.isWritable()) {
            channel.writeAndFlush(byteBuf);
            return;
        }
        byteBuf.release();
        baseSource.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE);
        if (logCounter.shouldPrint()) {
            logger.warn("Send msg but channel full, attr={}, channel={}", str, channel);
        }
        throw new ChannelUnWritableException("Send response but channel full");
    }
}
