package org.apache.inlong.dataproxy.source.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Event;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.SourceContext;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.class */
public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
    public static final Logger LOG = LoggerFactory.getLogger(InlongTcpChannelHandler.class);
    public static final int LENGTH_PARAM_OFFSET = 0;
    public static final int LENGTH_PARAM_LENGTH = 4;
    public static final int VERSION_PARAM_OFFSET = 4;
    public static final int VERSION_PARAM_LENGTH = 2;
    public static final int BODY_PARAM_OFFSET = 6;
    public static final int VERSION_1 = 1;
    private SourceContext sourceContext;

    public InlongTcpChannelHandler(SourceContext sourceContext) {
        this.sourceContext = sourceContext;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        LOG.debug("message received");
        if (obj == null) {
            LOG.error("get null msg, just skip");
            addMetric(false, 0L, null);
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        try {
            int readableBytes = byteBuf.readableBytes();
            if (readableBytes == 0) {
                LOG.warn("skip empty msg.");
                byteBuf.clear();
                addMetric(false, 0L, null);
                byteBuf.release();
                return;
            }
            if (readableBytes > 6 + this.sourceContext.getMaxMsgLength()) {
                addMetric(false, 0L, null);
                throw new Exception("err msg, MSG_MAX_LENGTH_BYTES < readableLength, and readableLength=" + readableBytes + ", and MSG_MAX_LENGTH_BYTES=" + this.sourceContext.getMaxMsgLength());
            }
            byteBuf.markReaderIndex();
            int readInt = byteBuf.readInt();
            if (readableBytes < readInt + 4) {
                byteBuf.resetReaderIndex();
                addMetric(false, 0L, null);
                throw new Exception("err msg, channel buffer is not satisfied, and  readableLength=" + readableBytes + ", and totalPackLength=" + readInt);
            }
            short readShort = byteBuf.readShort();
            switch (readShort) {
                case 1:
                    decodeVersion1(channelHandlerContext, byteBuf, readInt - 2);
                    return;
                default:
                    addMetric(false, 0L, null);
                    throw new Exception("err version, unknown version:" + ((int) readShort));
            }
        } finally {
            byteBuf.release();
        }
    }

    private void decodeVersion1(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, int i) throws Exception {
        byte[] bArr = new byte[i];
        byteBuf.readBytes(bArr);
        ProxySdk.MessagePack parseFrom = ProxySdk.MessagePack.parseFrom(bArr);
        if (this.sourceContext.isRejectService()) {
            addMetric(false, 0L, null);
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, parseFrom);
            return;
        }
        List<ProxyEvent> decodeSdkPack = EventUtils.decodeSdkPack(parseFrom);
        if (decodeSdkPack.size() == 0) {
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.SUCCUSS, parseFrom);
        }
        if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
            processAndWaitingSave(channelHandlerContext, parseFrom, decodeSdkPack);
        } else {
            processAndResponse(channelHandlerContext, parseFrom, decodeSdkPack);
        }
    }

    private void processAndWaitingSave(ChannelHandlerContext channelHandlerContext, ProxySdk.MessagePack messagePack, List<ProxyEvent> list) throws Exception {
        ProxySdk.MessagePackHeader header = messagePack.getHeader();
        InlongTcpSourceCallback inlongTcpSourceCallback = new InlongTcpSourceCallback(channelHandlerContext, header);
        try {
            this.sourceContext.getSource().getChannelProcessor().processEvent(new ProxyPackEvent(header.getInlongGroupId(), header.getInlongStreamId(), list, inlongTcpSourceCallback));
            list.forEach(proxyEvent -> {
                addMetric(true, proxyEvent.getBody().length, proxyEvent);
            });
            if (!inlongTcpSourceCallback.getLatch().await(CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), TimeUnit.MILLISECONDS) && !inlongTcpSourceCallback.getHasResponsed().getAndSet(true)) {
                responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
            }
        } catch (Throwable th) {
            LOG.error("Process Controller Event error can't write event to channel.", th);
            list.forEach(proxyEvent2 -> {
                addMetric(false, proxyEvent2.getBody().length, proxyEvent2);
            });
            if (inlongTcpSourceCallback.getHasResponsed().getAndSet(true)) {
                return;
            }
            responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
        }
    }

    private void processAndResponse(ChannelHandlerContext channelHandlerContext, ProxySdk.MessagePack messagePack, List<ProxyEvent> list) throws Exception {
        for (ProxyEvent proxyEvent : list) {
            String topic = this.sourceContext.getIdHolder().getTopic(proxyEvent.getUid());
            if (topic != null) {
                proxyEvent.setTopic(topic);
            }
            try {
                this.sourceContext.getSource().getChannelProcessor().processEvent(proxyEvent);
                addMetric(true, proxyEvent.getBody().length, proxyEvent);
            } catch (Throwable th) {
                LOG.error("Process Controller Event error can't write event to channel.", th);
                addMetric(false, proxyEvent.getBody().length, proxyEvent);
                responsePackage(channelHandlerContext, ProxySdk.ResultCode.ERR_REJECT, messagePack);
                return;
            }
        }
        responsePackage(channelHandlerContext, ProxySdk.ResultCode.SUCCUSS, messagePack);
    }

    private void addMetric(boolean z, long j, Event event) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", this.sourceContext.getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, this.sourceContext.getSourceId());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.sourceContext.getSourceDataId());
        DataProxyMetricItem.fillInlongId(event, hashMap);
        DataProxyMetricItem.fillAuditFormatTime(event, hashMap);
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.sourceContext.getMetricItemSet().findMetricItem(hashMap);
        if (!z) {
            dataProxyMetricItem.readFailCount.incrementAndGet();
            dataProxyMetricItem.readFailSize.addAndGet(j);
        } else {
            dataProxyMetricItem.readSuccessCount.incrementAndGet();
            dataProxyMetricItem.readSuccessSize.addAndGet(j);
            AuditUtils.add(5, event);
        }
    }

    private void responsePackage(ChannelHandlerContext channelHandlerContext, ProxySdk.ResultCode resultCode, ProxySdk.MessagePack messagePack) throws Exception {
        ProxySdk.ResponseInfo.Builder newBuilder = ProxySdk.ResponseInfo.newBuilder();
        newBuilder.setResult(resultCode);
        newBuilder.setPackId(messagePack.getHeader().getPackId());
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(newBuilder.build().toByteArray());
        Channel channel = channelHandlerContext.channel();
        if (channel.isWritable()) {
            channel.write(wrappedBuffer);
        } else {
            LOG.warn("the send buffer2 is full, so disconnect it!please check remote client; Connection info:" + channel);
            wrappedBuffer.release();
            throw new Exception("the send buffer2 is full,so disconnect it!please check remote client, Connection info:" + channel);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOG.error("exception caught cause = {}", th);
        channelHandlerContext.fireExceptionCaught(th);
        if (channelHandlerContext.channel() != null) {
            try {
                channelHandlerContext.channel().disconnect();
                channelHandlerContext.channel().close();
            } catch (Exception e) {
                LOG.error("Close connection error!", e);
            }
            this.sourceContext.getAllChannels().remove(channelHandlerContext.channel());
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        LOG.debug("Connection to {} disconnected.", channelHandlerContext.channel());
        channelHandlerContext.fireChannelInactive();
        try {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
        } catch (Exception e) {
            LOG.error("channelInactive has exception e = {}", e);
        }
        this.sourceContext.getAllChannels().remove(channelHandlerContext.channel());
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        String channelRemoteIP;
        if (this.sourceContext.getAllChannels().size() - 1 >= this.sourceContext.getMaxConnections()) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            LOG.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.sourceContext.getSource().getName(), channelHandlerContext.channel(), Integer.valueOf(this.sourceContext.getAllChannels().size() - 1), Integer.valueOf(this.sourceContext.getMaxConnections())});
        } else if (ConfigManager.getInstance().needChkIllegalIP() && (channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel())) != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            LOG.error(channelRemoteIP + " is Illegal IP, so refuse it !");
        } else {
            this.sourceContext.getAllChannels().add(channelHandlerContext.channel());
            channelHandlerContext.fireChannelActive();
            LOG.info("{} added new channel = {}, current connections = {}, maxConnections = {}", new Object[]{this.sourceContext.getSource().getName(), channelHandlerContext.channel(), Integer.valueOf(this.sourceContext.getAllChannels().size() - 1), Integer.valueOf(this.sourceContext.getMaxConnections())});
        }
    }
}
