package org.apache.inlong.dataproxy.source.httpMsg;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.class */
public class HttpMessageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger logger = LoggerFactory.getLogger(HttpMessageHandler.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20000);
    private final BaseSource source;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler$SendResultListener.class */
    public static class SendResultListener implements ChannelFutureListener {
        private final boolean isClose;

        public SendResultListener(boolean z) {
            this.isClose = z;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isSuccess()) {
                Throwable cause = channelFuture.cause();
                String channelRemoteIP = AddressUtils.getChannelRemoteIP(channelFuture.channel());
                if (HttpMessageHandler.logCounter.shouldPrint()) {
                    HttpMessageHandler.logger.error("Http return response to client {} failed, exception:{}, errmsg:{}", new Object[]{channelRemoteIP, cause, cause.getLocalizedMessage()});
                }
                channelFuture.channel().close();
            }
            if (this.isClose) {
                channelFuture.channel().close();
            }
        }
    }

    public HttpMessageHandler(BaseSource baseSource) {
        this.source = baseSource;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        if (HttpUtil.is100ContinueExpected(fullHttpRequest)) {
            channelHandlerContext.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
        }
        long currentTimeMillis = System.currentTimeMillis();
        String channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel());
        if (!fullHttpRequest.decoderResult().isSuccess()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_DECODE_FAIL);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE);
            return;
        }
        if (this.source.isRejectService()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.SERVICE_CLOSED);
            return;
        }
        if (!ConfigManager.getInstance().isMqClusterReady()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.SINK_SERVICE_UNREADY);
            return;
        }
        if (fullHttpRequest.method() != HttpMethod.GET && fullHttpRequest.method() != HttpMethod.POST) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_METHOD_INVALID);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD, "Only support [" + HttpMethod.GET.name() + ", " + HttpMethod.POST.name() + "] methods");
            return;
        }
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(fullHttpRequest.uri(), Charsets.toCharset("UTF-8"));
        if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(queryStringDecoder.path()) && !HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(queryStringDecoder.path())) {
            if (HttpAttrConst.KEY_URL_FAVICON_ICON.equals(queryStringDecoder.path())) {
                return;
            }
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_PATH_INVALID);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI, "Only support [/dataproxy/heartbeat, /dataproxy/message] paths!");
            return;
        }
        boolean isCloseConnection = isCloseConnection(fullHttpRequest);
        if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(queryStringDecoder.path())) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_SUCCESS);
            sendSuccessResponse(channelHandlerContext, isCloseConnection, null);
            return;
        }
        HashMap hashMap = new HashMap();
        getAttrsFromDecoder(queryStringDecoder, hashMap);
        if (fullHttpRequest.method() == HttpMethod.POST) {
            String str = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_LENGTH);
            if (StringUtils.isNotBlank(str) && NumberUtils.toInt(str, 0) > 0) {
                String str2 = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_TYPE);
                if (StringUtils.isNotBlank(str2)) {
                    if (!str2.trim().equalsIgnoreCase(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) {
                        this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_CONTYPE_INVALID);
                        sendErrorMsg(channelHandlerContext, DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE, "Only support [" + HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED + "] content type!");
                        return;
                    }
                    getAttrsFromDecoder(new QueryStringDecoder(fullHttpRequest.content().toString(Charsets.toCharset("UTF-8")), false), hashMap);
                }
            }
        }
        processMessage(channelHandlerContext, hashMap, currentTimeMillis, channelRemoteIP, isCloseConnection);
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        String channelRemoteIP;
        this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
        if (ConfigManager.getInstance().needChkIllegalIP() && (channelRemoteIP = AddressUtils.getChannelRemoteIP(channelHandlerContext.channel())) != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
            channelHandlerContext.channel().disconnect();
            channelHandlerContext.channel().close();
            if (logCounter.shouldPrint()) {
                logger.error(channelRemoteIP + " is Illegal IP, so refuse it !");
                return;
            }
            return;
        }
        if (this.source.getAllChannels().size() < this.source.getMaxConnections()) {
            this.source.getAllChannels().add(channelHandlerContext.channel());
            channelHandlerContext.fireChannelActive();
            return;
        }
        this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
        channelHandlerContext.channel().disconnect();
        channelHandlerContext.channel().close();
        if (logCounter.shouldPrint()) {
            logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.source.getCachedSrcName(), channelHandlerContext.channel(), Integer.valueOf(this.source.getAllChannels().size()), Integer.valueOf(this.source.getMaxConnections())});
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
        channelHandlerContext.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        if (th instanceof ReadTimeoutException) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_READ_TIMEOUT);
        } else if (th instanceof TooLongFrameException) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_FRAME_OVERMAX);
        } else if (th instanceof CorruptedFrameException) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_FRAME_CORRPUTED);
        } else if (th instanceof IOException) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_IO_EXCEPTION);
        } else {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_UNKNOWN_EXCEPTION);
        }
        if (exceptLogCounter.shouldPrint()) {
            logger.warn("{} received an exception from channel {}", new Object[]{this.source.getCachedSrcName(), channelHandlerContext.channel(), th});
        }
        if (th instanceof IOException) {
            channelHandlerContext.close();
        } else {
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.UNKNOWN_ERROR, "Process message failure: " + th.getMessage());
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (IdleStateEvent.class.isAssignableFrom(obj.getClass())) {
            channelHandlerContext.close();
        }
    }

    private void processMessage(ChannelHandlerContext channelHandlerContext, Map<String, String> map, long j, String str, boolean z) throws Exception {
        StringBuilder sb = new StringBuilder(512);
        String str2 = map.get(HttpAttrConst.KEY_CALLBACK);
        String str3 = map.get(HttpAttrConst.KEY_GROUP_ID);
        if (StringUtils.isBlank(str3)) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
            sendResponse(channelHandlerContext, DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(), sb.append("Field ").append(HttpAttrConst.KEY_GROUP_ID).append(" must exist and not blank!").toString(), z, str2);
            return;
        }
        String str4 = map.get(HttpAttrConst.KEY_STREAM_ID);
        if (StringUtils.isBlank(str4)) {
            this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_STREAMID_MISSING, str3);
            sendResponse(channelHandlerContext, DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(), sb.append("Field ").append(HttpAttrConst.KEY_STREAM_ID).append(" must exist and not blank!").toString(), z, str2);
            return;
        }
        String topicName = ConfigManager.getInstance().getTopicName(str3, str4);
        if (StringUtils.isEmpty(topicName)) {
            this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, str3);
            sendResponse(channelHandlerContext, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(), sb.append("Topic not configured for ").append(HttpAttrConst.KEY_GROUP_ID).append("(").append(str3).append("),").append(HttpAttrConst.KEY_STREAM_ID).append("(,").append(str4).append(")").toString(), z, str2);
            return;
        }
        long j2 = j;
        String str5 = map.get(HttpAttrConst.KEY_DATA_TIME);
        if (StringUtils.isNotEmpty(str5)) {
            try {
                j2 = Long.parseLong(str5);
            } catch (Throwable th) {
            }
        }
        String str6 = map.get("body");
        if (StringUtils.isBlank(str6)) {
            if (str6 == null) {
                this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_MISSING, str3);
                sendResponse(channelHandlerContext, DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(), sb.append("Field ").append("body").append(" is not exist!").toString(), z, str2);
                return;
            } else {
                this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_BLANK, str3);
                sendResponse(channelHandlerContext, DataProxyErrCode.EMPTY_MSG.getErrCode(), sb.append("Field ").append("body").append(" is Blank!").toString(), z, str2);
                return;
            }
        }
        if (str6.length() > this.source.getMaxMsgLength()) {
            this.source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_OVERMAX, str3);
            sendResponse(channelHandlerContext, DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(), sb.append("Error msg, the ").append("body").append(" length(").append(str6.length()).append(") is bigger than allowed length(").append(this.source.getMaxMsgLength()).append(")").toString(), z, str2);
            return;
        }
        int i = NumberUtils.toInt(map.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
        String valueOf = String.valueOf(i);
        long auditVersion = AuditUtils.getAuditVersion(map);
        InLongMsg newInLongMsg = InLongMsg.newInLongMsg(this.source.isCompressed());
        sb.append("groupId=").append(str3).append("&streamId=").append(str4).append("&dt=").append(j2).append("&clientIp=").append(str).append("&cnt=").append(valueOf).append("&rt=").append(j).append(AttrConstants.SEPARATOR).append("rtms").append(AttrConstants.KEY_VALUE_SEPARATOR).append(j);
        if (auditVersion != -1) {
            sb.append(AttrConstants.SEPARATOR).append("auditVersion").append(AttrConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
        }
        newInLongMsg.addMsg(sb.toString(), str6.getBytes("UTF-8"));
        byte[] buildArray = newInLongMsg.buildArray();
        long createtime = newInLongMsg.getCreatetime();
        newInLongMsg.reset();
        sb.delete(0, sb.length());
        HashMap hashMap = new HashMap();
        hashMap.put(HttpAttrConst.KEY_GROUP_ID, str3);
        hashMap.put(HttpAttrConst.KEY_STREAM_ID, str4);
        hashMap.put("topic", topicName);
        hashMap.put(HttpAttrConst.KEY_DATA_TIME, String.valueOf(j2));
        hashMap.put(ConfigConstants.REMOTE_IP_KEY, str);
        hashMap.put(ConfigConstants.DATAPROXY_IP_KEY, this.source.getSrcHost());
        hashMap.put(ConfigConstants.MSG_COUNTER_KEY, valueOf);
        hashMap.put(ConfigConstants.MSG_ENCODE_VER, MessageWrapType.INLONG_MSG_V0.getStrId());
        hashMap.put(ConfigConstants.VERSION_TYPE, MessageWrapType.INLONG_MSG_V0.getStrId());
        hashMap.put("rt", String.valueOf(j));
        hashMap.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(createtime));
        hashMap.put("auditVersion", String.valueOf(auditVersion));
        Event withBody = EventBuilder.withBody(buildArray, hashMap);
        try {
            this.source.getCachedChProcessor().processEvent(withBody);
            this.source.fileMetricAddSuccStats(sb, str3, str4, topicName, str, "b2b", j2, createtime, i, 1, withBody.getBody().length);
            this.source.addMetric(true, withBody.getBody().length, withBody);
            sendSuccessResponse(channelHandlerContext, z, str2);
        } catch (Throwable th2) {
            this.source.fileMetricAddFailStats(sb, str3, str4, topicName, str, "b2b", j2, createtime, 1);
            this.source.addMetric(false, withBody.getBody().length, withBody);
            sendErrorMsg(channelHandlerContext, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE, sb.append("Put HTTP event to channel failure: ").append(th2.getMessage()).toString(), str2);
            if (logCounter.shouldPrint()) {
                logger.error("Error writing HTTP event to channel failure.", th2);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void getAttrsFromDecoder(QueryStringDecoder queryStringDecoder, Map<String, String> map) {
        for (Map.Entry entry : queryStringDecoder.parameters().entrySet()) {
            if (entry != null && entry.getKey() != null && entry.getValue() != null && !((List) entry.getValue()).isEmpty()) {
                map.put(entry.getKey(), ((List) entry.getValue()).get(0));
            }
        }
    }

    private boolean isCloseConnection(FullHttpRequest fullHttpRequest) {
        String str = fullHttpRequest.headers().get(HttpHeaderNames.CONNECTION);
        if (str == null) {
            return false;
        }
        return str.trim().equalsIgnoreCase(HttpHeaderValues.CLOSE.toString());
    }

    private void sendErrorMsg(ChannelHandlerContext channelHandlerContext, DataProxyErrCode dataProxyErrCode) {
        sendResponse(channelHandlerContext, dataProxyErrCode.getErrCode(), dataProxyErrCode.getErrMsg(), true, null);
    }

    private void sendErrorMsg(ChannelHandlerContext channelHandlerContext, DataProxyErrCode dataProxyErrCode, String str) {
        sendResponse(channelHandlerContext, dataProxyErrCode.getErrCode(), str, true, null);
    }

    private void sendErrorMsg(ChannelHandlerContext channelHandlerContext, DataProxyErrCode dataProxyErrCode, String str, String str2) {
        sendResponse(channelHandlerContext, dataProxyErrCode.getErrCode(), str, true, str2);
    }

    private void sendSuccessResponse(ChannelHandlerContext channelHandlerContext, boolean z, String str) {
        sendResponse(channelHandlerContext, DataProxyErrCode.SUCCESS.getErrCode(), DataProxyErrCode.SUCCESS.getErrMsg(), z, str);
    }

    private void sendResponse(ChannelHandlerContext channelHandlerContext, int i, String str, boolean z, String str2) {
        if (channelHandlerContext == null || channelHandlerContext.channel() == null) {
            return;
        }
        if (!channelHandlerContext.channel().isWritable()) {
            this.source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_UNWRITABLE);
            if (logCounter.shouldPrint()) {
                logger.warn("Send msg but channel full, channel={}", channelHandlerContext.channel());
                return;
            }
            return;
        }
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpAttrConst.RET_CNT_TYPE);
        StringBuilder sb = new StringBuilder(512);
        if (StringUtils.isNotBlank(str2)) {
            sb.append(str2).append("(");
        }
        sb.append("{\"code\":\"").append(i).append("\",\"msg\":\"").append(str).append("\"}");
        if (StringUtils.isNotBlank(str2)) {
            sb.append(")");
        }
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(sb.toString(), CharsetUtil.UTF_8);
        defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(copiedBuffer.readableBytes()));
        defaultFullHttpResponse.content().writeBytes(copiedBuffer);
        copiedBuffer.release();
        channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(new SendResultListener(z));
    }
}
