package org.apache.eventmesh.runtime.boot;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.DiskAttribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.runtime.common.Pair;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/AbrstractHTTPServer.class */
public abstract class AbrstractHTTPServer extends AbstractRemotingServer {
    public HTTPMetricsServer metrics;
    private boolean useTLS;
    public Logger httpServerLogger = LoggerFactory.getLogger(getClass());
    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    public DefaultHttpDataFactory defaultHttpDataFactory = new DefaultHttpDataFactory(false);
    private AtomicBoolean started = new AtomicBoolean(false);
    public ThreadPoolExecutor asyncContextCompleteHandler = ThreadPoolFactory.createThreadPoolExecutor(10, 10, "eventMesh-http-asyncContext-");
    protected HashMap<Integer, Pair<HttpRequestProcessor, ThreadPoolExecutor>> processorTable = new HashMap<>(64);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/eventmesh/runtime/boot/AbrstractHTTPServer$HTTPHandler.class */
    public class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {
        HTTPHandler() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
            HttpPostRequestDecoder httpPostRequestDecoder = null;
            try {
                try {
                    if (!httpRequest.decoderResult().isSuccess()) {
                        AbrstractHTTPServer.this.sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e) {
                            return;
                        }
                    }
                    HttpCommand httpCommand = new HttpCommand();
                    httpRequest.headers().set("Ip", RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
                    String deleteWhitespace = StringUtils.deleteWhitespace(httpRequest.headers().get("Version"));
                    if (StringUtils.isBlank(deleteWhitespace)) {
                        deleteWhitespace = ProtocolVersion.V1.getVersion();
                        httpRequest.headers().set("Version", ProtocolVersion.V1.getVersion());
                    }
                    AbrstractHTTPServer.this.metrics.summaryMetrics.recordHTTPRequest();
                    long currentTimeMillis = System.currentTimeMillis();
                    HashMap hashMap = new HashMap();
                    if (httpRequest.method() == HttpMethod.GET) {
                        new QueryStringDecoder(httpRequest.uri()).parameters().entrySet().forEach(entry -> {
                            hashMap.put(entry.getKey(), ((List) entry.getValue()).get(0));
                        });
                    } else if (httpRequest.method() != HttpMethod.POST) {
                        AbrstractHTTPServer.this.sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e2) {
                            return;
                        }
                    } else {
                        httpPostRequestDecoder = new HttpPostRequestDecoder(AbrstractHTTPServer.this.defaultHttpDataFactory, httpRequest);
                        for (Attribute attribute : httpPostRequestDecoder.getBodyHttpDatas()) {
                            if (attribute.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                                Attribute attribute2 = attribute;
                                hashMap.put(attribute2.getName(), attribute2.getValue());
                            }
                        }
                    }
                    AbrstractHTTPServer.this.metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - currentTimeMillis);
                    String deleteWhitespace2 = httpRequest.method() == HttpMethod.POST ? StringUtils.deleteWhitespace(httpRequest.headers().get("Code")) : MapUtils.getString(hashMap, StringUtils.lowerCase("Code"), "");
                    httpCommand.setHttpMethod(httpRequest.method().name());
                    httpCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());
                    httpCommand.setRequestCode(deleteWhitespace2);
                    if (!ProtocolVersion.contains(deleteWhitespace)) {
                        AbrstractHTTPServer.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()).httpResponse());
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e3) {
                            return;
                        }
                    }
                    if (StringUtils.isBlank(deleteWhitespace2) || !StringUtils.isNumeric(deleteWhitespace2) || !RequestCode.contains(Integer.valueOf(deleteWhitespace2)) || !AbrstractHTTPServer.this.processorTable.containsKey(Integer.valueOf(deleteWhitespace2))) {
                        AbrstractHTTPServer.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getRetCode(), EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg()).httpResponse());
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e4) {
                            return;
                        }
                    }
                    if (!AbrstractHTTPServer.this.started.get()) {
                        AbrstractHTTPServer.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_STOP.getRetCode(), EventMeshRetCode.EVENTMESH_STOP.getErrMsg()).httpResponse());
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e5) {
                            return;
                        }
                    }
                    try {
                        httpCommand.setHeader(Header.buildHeader(deleteWhitespace2, AbrstractHTTPServer.this.parseHTTPHeader(httpRequest)));
                        httpCommand.setBody(Body.buildBody(deleteWhitespace2, hashMap));
                        if (AbrstractHTTPServer.this.httpLogger.isDebugEnabled()) {
                            AbrstractHTTPServer.this.httpLogger.debug("{}", httpCommand);
                        }
                        processEventMeshRequest(channelHandlerContext, new AsyncContext<>(httpCommand, null, AbrstractHTTPServer.this.asyncContextCompleteHandler));
                        httpPostRequestDecoder = httpPostRequestDecoder;
                    } catch (Exception e6) {
                        AbrstractHTTPServer.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg() + EventMeshUtil.stackTrace(e6, 3)).httpResponse());
                        try {
                            httpPostRequestDecoder.destroy();
                        } catch (Exception e7) {
                        }
                    }
                } finally {
                    try {
                        httpPostRequestDecoder.destroy();
                    } catch (Exception e8) {
                    }
                }
            } catch (Exception e9) {
                AbrstractHTTPServer.this.httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", e9);
                try {
                    httpPostRequestDecoder.destroy();
                } catch (Exception e10) {
                }
            }
        }

        public void processEventMeshRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) {
            Pair<HttpRequestProcessor, ThreadPoolExecutor> pair = AbrstractHTTPServer.this.processorTable.get(Integer.valueOf(asyncContext.getRequest().getRequestCode()));
            try {
                pair.getObject2().submit(() -> {
                    try {
                        if (((HttpRequestProcessor) pair.getObject1()).rejectRequest()) {
                            HttpCommand createHttpCommandResponse = ((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getRetCode(), EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg());
                            asyncContext.onComplete(createHttpCommandResponse);
                            if (asyncContext.isComplete()) {
                                if (AbrstractHTTPServer.this.httpLogger.isDebugEnabled()) {
                                    AbrstractHTTPServer.this.httpLogger.debug("{}", asyncContext.getResponse());
                                }
                                AbrstractHTTPServer.this.sendResponse(channelHandlerContext, createHttpCommandResponse.httpResponse());
                                return;
                            }
                            return;
                        }
                        ((HttpRequestProcessor) pair.getObject1()).processRequest(channelHandlerContext, asyncContext);
                        if (asyncContext == null || !asyncContext.isComplete()) {
                            return;
                        }
                        AbrstractHTTPServer.this.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - ((HttpCommand) asyncContext.getRequest()).getReqTime());
                        if (AbrstractHTTPServer.this.httpLogger.isDebugEnabled()) {
                            AbrstractHTTPServer.this.httpLogger.debug("{}", asyncContext.getResponse());
                        }
                        AbrstractHTTPServer.this.sendResponse(channelHandlerContext, ((HttpCommand) asyncContext.getResponse()).httpResponse());
                    } catch (Exception e) {
                        AbrstractHTTPServer.this.httpServerLogger.error("process error", e);
                    }
                });
            } catch (RejectedExecutionException e) {
                HttpCommand createHttpCommandResponse = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.OVERLOAD.getRetCode(), EventMeshRetCode.OVERLOAD.getErrMsg());
                asyncContext.onComplete(createHttpCommandResponse);
                AbrstractHTTPServer.this.metrics.summaryMetrics.recordHTTPDiscard();
                AbrstractHTTPServer.this.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - createHttpCommandResponse.getReqTime());
                try {
                    AbrstractHTTPServer.this.sendResponse(channelHandlerContext, asyncContext.getResponse().httpResponse());
                } catch (Exception e2) {
                }
            }
        }

        public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelReadComplete(channelHandlerContext);
            channelHandlerContext.flush();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            if (null != th) {
                th.printStackTrace();
            }
            if (null != channelHandlerContext) {
                channelHandlerContext.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/eventmesh/runtime/boot/AbrstractHTTPServer$HttpConnectionHandler.class */
    public class HttpConnectionHandler extends ChannelDuplexHandler {
        public AtomicInteger connections = new AtomicInteger(0);

        HttpConnectionHandler() {
        }

        public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelRegistered(channelHandlerContext);
        }

        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelUnregistered(channelHandlerContext);
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
            if (this.connections.incrementAndGet() <= 20000) {
                super.channelActive(channelHandlerContext);
            } else {
                AbrstractHTTPServer.this.httpServerLogger.warn("client|http|channelActive|remoteAddress={}|msg={}", parseChannelRemoteAddr, "too many client(20000) connect this eventMesh server");
                channelHandlerContext.close();
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.connections.decrementAndGet();
            RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
            super.channelInactive(channelHandlerContext);
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state().equals(IdleState.ALL_IDLE)) {
                AbrstractHTTPServer.this.httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}", RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), obj.getClass().getName());
                channelHandlerContext.close();
            }
            channelHandlerContext.fireUserEventTriggered(obj);
        }
    }

    /* loaded from: input_file:org/apache/eventmesh/runtime/boot/AbrstractHTTPServer$HttpsServerInitializer.class */
    class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {
        private SSLContext sslContext;

        public HttpsServerInitializer(SSLContext sSLContext) {
            this.sslContext = sSLContext;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            if (this.sslContext != null && AbrstractHTTPServer.this.useTLS) {
                SSLEngine createSSLEngine = this.sslContext.createSSLEngine();
                createSSLEngine.setUseClientMode(false);
                pipeline.addFirst("ssl", new SslHandler(createSSLEngine));
            }
            pipeline.addLast(new ChannelHandler[]{new HttpRequestDecoder(), new HttpResponseEncoder(), new HttpConnectionHandler(), new HttpObjectAggregator(Integer.MAX_VALUE), new HTTPHandler()});
        }
    }

    public AbrstractHTTPServer(int i, boolean z) {
        this.port = i;
        this.useTLS = z;
    }

    public Map<String, Object> parseHTTPHeader(HttpRequest httpRequest) {
        HashMap hashMap = new HashMap();
        for (String str : httpRequest.headers().names()) {
            if (!StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), str) && !StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), str) && !StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), str)) {
                hashMap.put(str, httpRequest.headers().get(str));
            }
        }
        return hashMap;
    }

    public void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
        defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + "; charset=UTF-8");
        defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(defaultFullHttpResponse.content().readableBytes()));
        defaultFullHttpResponse.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE);
    }

    public void sendResponse(ChannelHandlerContext channelHandlerContext, DefaultFullHttpResponse defaultFullHttpResponse) {
        channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.boot.AbrstractHTTPServer.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                AbrstractHTTPServer.this.httpLogger.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(channelFuture.channel()));
                channelFuture.channel().close();
            }
        });
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        super.start();
        new Thread(() -> {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new HttpsServerInitializer(SSLContextFactory.getSslContext())).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            try {
                this.httpServerLogger.info("HTTPServer[port={}] started......", Integer.valueOf(this.port));
                serverBootstrap.bind(this.port).sync().channel().closeFuture().sync();
            } catch (Exception e) {
                this.httpServerLogger.error("HTTPServer start Err!", e);
                try {
                    shutdown();
                } catch (Exception e2) {
                    this.httpServerLogger.error("HTTPServer shutdown Err!", e);
                }
            }
        }, "eventMesh-http-server").start();
        this.started.compareAndSet(false, true);
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void init(String str) throws Exception {
        super.init(str);
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        super.shutdown();
        this.started.compareAndSet(true, false);
    }

    public void registerProcessor(Integer num, HttpRequestProcessor httpRequestProcessor, ThreadPoolExecutor threadPoolExecutor) {
        Preconditions.checkState(ObjectUtils.allNotNull(new Object[]{num}), "requestCode can't be null");
        Preconditions.checkState(ObjectUtils.allNotNull(new Object[]{httpRequestProcessor}), "processor can't be null");
        Preconditions.checkState(ObjectUtils.allNotNull(new Object[]{threadPoolExecutor}), "executor can't be null");
        this.processorTable.put(num, new Pair<>(httpRequestProcessor, threadPoolExecutor));
    }

    static {
        DiskAttribute.deleteOnExitTemporaryFile = false;
    }
}
