package org.apache.eventmesh.client.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.client.http.consumer.HandleResult;
import org.apache.eventmesh.client.http.consumer.context.LiteConsumeContext;
import org.apache.eventmesh.client.http.consumer.listener.LiteMessageListener;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.LiteMessage;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.PushMessageRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/client/http/RemotingServer.class */
public class RemotingServer {
    public Logger logger;
    public AtomicBoolean started;
    public AtomicBoolean inited;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private int port;
    private DefaultHttpDataFactory defaultHttpDataFactory;
    private ThreadPoolExecutor consumeExecutor;
    private LiteMessageListener messageListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/eventmesh/client/http/RemotingServer$HTTPHandler.class */
    public class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {
        HTTPHandler() {
        }

        private Map<String, Object> parseHTTPHeader(HttpRequest httpRequest) {
            HashMap hashMap = new HashMap();
            for (String str : httpRequest.headers().names()) {
                if (!StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), str) && !StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), str) && !StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), str)) {
                    hashMap.put(str, httpRequest.headers().get(str));
                }
            }
            return hashMap;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(final ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
            HttpPostRequestDecoder httpPostRequestDecoder = null;
            try {
                try {
                    if (!httpRequest.decoderResult().isSuccess()) {
                        sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e) {
                            return;
                        }
                    }
                    String deleteWhitespace = StringUtils.deleteWhitespace(httpRequest.headers().get("Version"));
                    if (StringUtils.isBlank(deleteWhitespace) || !ProtocolVersion.contains(deleteWhitespace)) {
                        httpRequest.headers().set("Version", ProtocolVersion.V1.getVersion());
                    }
                    HashMap hashMap = new HashMap();
                    if (httpRequest.method() == HttpMethod.GET) {
                        for (Map.Entry entry : new QueryStringDecoder(httpRequest.uri()).parameters().entrySet()) {
                            hashMap.put(entry.getKey(), ((List) entry.getValue()).get(0));
                        }
                    } else if (httpRequest.method() != HttpMethod.POST) {
                        sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e2) {
                            return;
                        }
                    } else {
                        httpPostRequestDecoder = new HttpPostRequestDecoder(RemotingServer.this.defaultHttpDataFactory, httpRequest);
                        for (Attribute attribute : httpPostRequestDecoder.getBodyHttpDatas()) {
                            if (attribute.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                                Attribute attribute2 = attribute;
                                hashMap.put(attribute2.getName(), attribute2.getValue());
                            }
                        }
                    }
                    String deleteWhitespace2 = httpRequest.method() == HttpMethod.POST ? StringUtils.deleteWhitespace(httpRequest.headers().get("Code")) : MapUtils.getString(hashMap, StringUtils.lowerCase("Code"), "");
                    final HttpCommand httpCommand = new HttpCommand(httpRequest.method().name(), httpRequest.protocolVersion().protocolName(), deleteWhitespace2);
                    if (StringUtils.isBlank(deleteWhitespace2) || !StringUtils.isNumeric(deleteWhitespace2) || (!String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode()).equals(deleteWhitespace2) && !String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode()).equals(deleteWhitespace2))) {
                        RemotingServer.this.logger.error("receive invalid requestCode, {}", deleteWhitespace2);
                        sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(ClientRetCode.OK.getRetCode(), ClientRetCode.OK.getErrMsg()).httpResponse());
                        try {
                            httpPostRequestDecoder.destroy();
                            return;
                        } catch (Exception e3) {
                            return;
                        }
                    }
                    httpCommand.setHeader(Header.buildHeader(deleteWhitespace2, parseHTTPHeader(httpRequest)));
                    httpCommand.setBody(Body.buildBody(deleteWhitespace2, hashMap));
                    if (RemotingServer.this.logger.isDebugEnabled()) {
                        RemotingServer.this.logger.debug("{}", httpCommand);
                    }
                    PushMessageRequestHeader pushMessageRequestHeader = httpCommand.header;
                    PushMessageRequestBody pushMessageRequestBody = httpCommand.body;
                    String topic = pushMessageRequestBody.getTopic();
                    final LiteConsumeContext liteConsumeContext = new LiteConsumeContext(pushMessageRequestHeader.getEventMeshIp(), pushMessageRequestHeader.getEventMeshEnv(), pushMessageRequestHeader.getEventMeshIdc(), pushMessageRequestHeader.getEventMeshCluster());
                    final LiteMessage liteMessage = new LiteMessage(pushMessageRequestBody.getBizSeqNo(), pushMessageRequestBody.getUniqueId(), topic, pushMessageRequestBody.getContent());
                    for (Map.Entry entry2 : pushMessageRequestBody.getExtFields().entrySet()) {
                        liteMessage.addProp((String) entry2.getKey(), (String) entry2.getValue());
                    }
                    RemotingServer.this.consumeExecutor.execute(new Runnable() { // from class: org.apache.eventmesh.client.http.RemotingServer.HTTPHandler.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                if (RemotingServer.this.messageListener.reject()) {
                                    HTTPHandler.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(HTTPHandler.this.handleResult2ClientRetCode(HandleResult.RETRY).getRetCode(), HTTPHandler.this.handleResult2ClientRetCode(HandleResult.RETRY).getErrMsg()).httpResponse());
                                    return;
                                }
                                HandleResult handle = RemotingServer.this.messageListener.handle(liteMessage, liteConsumeContext);
                                if (RemotingServer.this.logger.isDebugEnabled()) {
                                    RemotingServer.this.logger.info("bizSeqNo:{}, topic:{}, handleResult:{}", new Object[]{liteMessage.getBizSeqNo(), liteMessage.getTopic(), handle});
                                }
                                HTTPHandler.this.sendResponse(channelHandlerContext, httpCommand.createHttpCommandResponse(HTTPHandler.this.handleResult2ClientRetCode(handle).getRetCode(), HTTPHandler.this.handleResult2ClientRetCode(handle).getErrMsg()).httpResponse());
                            } catch (Exception e4) {
                                RemotingServer.this.logger.error("process error", e4);
                            }
                        }
                    });
                    httpPostRequestDecoder = httpPostRequestDecoder;
                } catch (Exception e4) {
                    RemotingServer.this.logger.error("HTTPHandler.channelRead0 err", e4);
                    try {
                        httpPostRequestDecoder.destroy();
                    } catch (Exception e5) {
                    }
                }
            } finally {
                try {
                    httpPostRequestDecoder.destroy();
                } catch (Exception e6) {
                }
            }
        }

        public ClientRetCode handleResult2ClientRetCode(HandleResult handleResult) {
            return handleResult == HandleResult.OK ? ClientRetCode.OK : handleResult == HandleResult.FAIL ? ClientRetCode.FAIL : handleResult == HandleResult.NOLISTEN ? ClientRetCode.NOLISTEN : handleResult == HandleResult.RETRY ? ClientRetCode.RETRY : ClientRetCode.OK;
        }

        public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelReadComplete(channelHandlerContext);
            channelHandlerContext.flush();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            if (null != th) {
                th.printStackTrace();
            }
            if (null != channelHandlerContext) {
                channelHandlerContext.close();
            }
        }

        private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + "; charset=UTF-8");
            defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(defaultFullHttpResponse.content().readableBytes()));
            defaultFullHttpResponse.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendResponse(ChannelHandlerContext channelHandlerContext, DefaultFullHttpResponse defaultFullHttpResponse) {
            channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.client.http.RemotingServer.HTTPHandler.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    RemotingServer.this.logger.warn("send response to [{}] fail, will close this channel", IPUtil.parseChannelRemoteAddr(channelFuture.channel()));
                    channelFuture.channel().close();
                }
            });
        }

        public void shutdown() throws Exception {
            if (RemotingServer.this.bossGroup != null) {
                RemotingServer.this.bossGroup.shutdownGracefully();
            }
            ThreadUtil.randomSleep(30);
            if (RemotingServer.this.workerGroup != null) {
                RemotingServer.this.workerGroup.shutdownGracefully();
            }
            RemotingServer.this.started.compareAndSet(true, false);
            RemotingServer.this.inited.compareAndSet(true, false);
        }
    }

    public RemotingServer() {
        this.logger = LoggerFactory.getLogger(getClass());
        this.started = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.inited = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.port = RandomUtils.nextInt(1000, 20000);
        this.defaultHttpDataFactory = new DefaultHttpDataFactory(false);
    }

    public RemotingServer(int i) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.started = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.inited = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.port = RandomUtils.nextInt(1000, 20000);
        this.defaultHttpDataFactory = new DefaultHttpDataFactory(false);
        this.port = i;
    }

    public RemotingServer(ThreadPoolExecutor threadPoolExecutor) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.started = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.inited = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.port = RandomUtils.nextInt(1000, 20000);
        this.defaultHttpDataFactory = new DefaultHttpDataFactory(false);
        this.consumeExecutor = threadPoolExecutor;
    }

    public RemotingServer(int i, ThreadPoolExecutor threadPoolExecutor) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.started = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.inited = new AtomicBoolean(Boolean.FALSE.booleanValue());
        this.port = RandomUtils.nextInt(1000, 20000);
        this.defaultHttpDataFactory = new DefaultHttpDataFactory(false);
        this.port = i;
        this.consumeExecutor = threadPoolExecutor;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public void setConsumeExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.consumeExecutor = threadPoolExecutor;
    }

    public void registerMessageListener(LiteMessageListener liteMessageListener) {
        this.messageListener = liteMessageListener;
    }

    private EventLoopGroup initBossGroup() {
        this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { // from class: org.apache.eventmesh.client.http.RemotingServer.1
            AtomicInteger count = new AtomicInteger(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "endPointBoss-" + this.count.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
        });
        return this.bossGroup;
    }

    private EventLoopGroup initWokerGroup() {
        this.workerGroup = new NioEventLoopGroup(2, new ThreadFactory() { // from class: org.apache.eventmesh.client.http.RemotingServer.2
            AtomicInteger count = new AtomicInteger(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "endpointWorker-" + this.count.incrementAndGet());
            }
        });
        return this.workerGroup;
    }

    public String getEndpointURL() {
        return String.format("http://%s:%s", IPUtil.getLocalAddress(), Integer.valueOf(this.port));
    }

    public void init() throws Exception {
        initBossGroup();
        initWokerGroup();
        this.inited.compareAndSet(false, true);
    }

    public void start() throws Exception {
        new Thread(new Runnable() { // from class: org.apache.eventmesh.client.http.RemotingServer.3
            @Override // java.lang.Runnable
            public void run() {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(RemotingServer.this.bossGroup, RemotingServer.this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.eventmesh.client.http.RemotingServer.3.1
                    public void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpRequestDecoder(), new HttpResponseEncoder(), new HttpObjectAggregator(Integer.MAX_VALUE), new HTTPHandler()});
                    }
                }).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
                try {
                    RemotingServer.this.logger.info("EventMesh Client[{}] Started......", Integer.valueOf(RemotingServer.this.port));
                    serverBootstrap.bind(RemotingServer.this.port).sync().channel().closeFuture().sync();
                    RemotingServer.this.started.compareAndSet(false, true);
                } catch (Exception e) {
                    RemotingServer.this.bossGroup.shutdownGracefully();
                    RemotingServer.this.workerGroup.shutdownGracefully();
                }
            }
        }, "eventMesh-client-remoting-server").start();
    }
}
