package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.fasterxml.jackson.core.type.TypeReference;
import io.cloudevents.CloudEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.HTTPTrace;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.util.HttpResponseUtils;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.class */
public class HandlerService {
    private HTTPMetricsServer metrics;
    private HTTPTrace httpTrace;
    private Logger httpServerLogger = LoggerFactory.getLogger(getClass());
    private Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap();
    public DefaultHttpDataFactory defaultHttpDataFactory = new DefaultHttpDataFactory(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService$HandlerSpecific.class */
    public class HandlerSpecific implements Runnable {
        private HTTPTrace.TraceOperation traceOperation;
        private ChannelHandlerContext ctx;
        private HttpRequest request;
        private HttpResponse response;
        private AsyncContext<HttpEventWrapper> asyncContext;
        private Throwable exception;
        long requestTime = System.currentTimeMillis();
        private Map<String, Object> traceMap;
        private CloudEvent ce;

        HandlerSpecific() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = "/";
            Iterator it = HandlerService.this.httpProcessorMap.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                String str2 = (String) it.next();
                if (this.request.uri().startsWith(str2)) {
                    str = str2;
                    break;
                }
            }
            ProcessorWrapper processorWrapper = (ProcessorWrapper) HandlerService.this.httpProcessorMap.get(str);
            try {
                preHandler();
                if (!(processorWrapper.httpProcessor instanceof AsyncHttpProcessor)) {
                    this.response = processorWrapper.httpProcessor.handler(this.request);
                    postHandler();
                } else {
                    this.asyncContext.setRequest(HandlerService.this.parseHttpRequest(this.request));
                    processorWrapper.async.handler(this, this.request);
                }
            } catch (Throwable th) {
                this.exception = th;
                this.response = HttpResponseUtils.createInternalServerError();
                error();
            }
        }

        private void postHandler() {
            HandlerService.this.metrics.getSummaryMetrics().recordHTTPRequest();
            if (HandlerService.this.httpLogger.isDebugEnabled()) {
                HandlerService.this.httpLogger.debug("{}", this.request);
            }
            if (Objects.isNull(this.response)) {
                this.response = HttpResponseUtils.createSuccess();
            }
            this.traceOperation.endTrace(this.ce);
            HandlerService.this.sendResponse(this.ctx, this.request, this.response);
        }

        private void preHandler() {
            HandlerService.this.metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - this.requestTime);
            if (HandlerService.this.httpLogger.isDebugEnabled()) {
                HandlerService.this.httpLogger.debug("{}", this.response);
            }
        }

        private void error() {
            HandlerService.this.httpServerLogger.error(this.exception.getMessage(), this.exception);
            this.traceOperation.exceptionTrace(this.exception, this.traceMap);
            HandlerService.this.metrics.getSummaryMetrics().recordHTTPDiscard();
            HandlerService.this.metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - this.requestTime);
            HandlerService.this.sendResponse(this.ctx, this.request, this.response);
        }

        public void setResponseJsonBody(String str) {
            sendResponse(HttpResponseUtils.setResponseJsonBody(str, this.ctx));
        }

        public void setResponseTextBody(String str) {
            sendResponse(HttpResponseUtils.setResponseTextBody(str, this.ctx));
        }

        public void sendResponse(HttpResponse httpResponse) {
            this.response = httpResponse;
            postHandler();
        }

        public void sendResponse(Map<String, Object> map, Map<String, Object> map2) {
            try {
                this.asyncContext.onComplete(this.asyncContext.getRequest().createHttpResponse(map, map2));
                this.response = this.asyncContext.getResponse().httpResponse();
                postHandler();
            } catch (Exception e) {
                this.exception = e;
                this.response = HttpResponseUtils.createInternalServerError();
                error();
            }
        }

        public void sendErrorResponse(EventMeshRetCode eventMeshRetCode, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3) {
            this.traceMap = map3;
            try {
                map2.put("retCode", eventMeshRetCode.getRetCode());
                map2.put("retMsg", eventMeshRetCode.getErrMsg());
                this.asyncContext.onComplete(this.asyncContext.getRequest().createHttpResponse(map, map2));
                this.exception = new RuntimeException(eventMeshRetCode.getErrMsg());
                this.response = this.asyncContext.getResponse().httpResponse();
                error();
            } catch (Exception e) {
                this.exception = e;
                this.response = HttpResponseUtils.createInternalServerError();
                error();
            }
        }

        public void recordSendBatchMsgFailed(int i) {
            HandlerService.this.metrics.getSummaryMetrics().recordSendBatchMsgFailed(1L);
        }

        public HTTPTrace.TraceOperation getTraceOperation() {
            return this.traceOperation;
        }

        public ChannelHandlerContext getCtx() {
            return this.ctx;
        }

        public HttpRequest getRequest() {
            return this.request;
        }

        public HttpResponse getResponse() {
            return this.response;
        }

        public AsyncContext<HttpEventWrapper> getAsyncContext() {
            return this.asyncContext;
        }

        public Throwable getException() {
            return this.exception;
        }

        public long getRequestTime() {
            return this.requestTime;
        }

        public Map<String, Object> getTraceMap() {
            return this.traceMap;
        }

        public CloudEvent getCe() {
            return this.ce;
        }

        public void setTraceOperation(HTTPTrace.TraceOperation traceOperation) {
            this.traceOperation = traceOperation;
        }

        public void setCtx(ChannelHandlerContext channelHandlerContext) {
            this.ctx = channelHandlerContext;
        }

        public void setRequest(HttpRequest httpRequest) {
            this.request = httpRequest;
        }

        public void setResponse(HttpResponse httpResponse) {
            this.response = httpResponse;
        }

        public void setAsyncContext(AsyncContext<HttpEventWrapper> asyncContext) {
            this.asyncContext = asyncContext;
        }

        public void setException(Throwable th) {
            this.exception = th;
        }

        public void setRequestTime(long j) {
            this.requestTime = j;
        }

        public void setTraceMap(Map<String, Object> map) {
            this.traceMap = map;
        }

        public void setCe(CloudEvent cloudEvent) {
            this.ce = cloudEvent;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService$ProcessorWrapper.class */
    public static class ProcessorWrapper {
        private ThreadPoolExecutor threadPoolExecutor;
        private HttpProcessor httpProcessor;
        private AsyncHttpProcessor async;
        private boolean traceEnabled;

        private ProcessorWrapper() {
        }
    }

    public void init() {
        this.httpServerLogger.info("HandlerService start ");
    }

    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
        for (String str : httpProcessor.paths()) {
            register(str, httpProcessor, threadPoolExecutor);
        }
    }

    public void register(String str, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
        if (this.httpProcessorMap.containsKey(str)) {
            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ", str, httpProcessor.getClass().getSimpleName()));
        }
        ProcessorWrapper processorWrapper = new ProcessorWrapper();
        processorWrapper.threadPoolExecutor = threadPoolExecutor;
        if (httpProcessor instanceof AsyncHttpProcessor) {
            processorWrapper.async = (AsyncHttpProcessor) httpProcessor;
        }
        processorWrapper.httpProcessor = httpProcessor;
        processorWrapper.traceEnabled = ((EventMeshTrace) httpProcessor.getClass().getAnnotation(EventMeshTrace.class)).isEnable();
        this.httpProcessorMap.put(str, processorWrapper);
        this.httpServerLogger.info("path is {}  processor name is {}", str, httpProcessor.getClass().getSimpleName());
    }

    public boolean isProcessorWrapper(HttpRequest httpRequest) {
        return Objects.nonNull(getProcessorWrapper(httpRequest));
    }

    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
        String uri = httpRequest.uri();
        for (Map.Entry<String, ProcessorWrapper> entry : this.httpProcessorMap.entrySet()) {
            if (uri.startsWith(entry.getKey())) {
                return entry.getValue();
            }
        }
        return null;
    }

    public void handler(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, ThreadPoolExecutor threadPoolExecutor) {
        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
        if (Objects.isNull(processorWrapper)) {
            sendResponse(channelHandlerContext, httpRequest, HttpResponseUtils.createNotFound());
            return;
        }
        HTTPTrace.TraceOperation traceOperation = this.httpTrace.getTraceOperation(httpRequest, channelHandlerContext.channel(), processorWrapper.traceEnabled);
        try {
            HandlerSpecific handlerSpecific = new HandlerSpecific();
            handlerSpecific.request = httpRequest;
            handlerSpecific.ctx = channelHandlerContext;
            handlerSpecific.traceOperation = traceOperation;
            handlerSpecific.asyncContext = new AsyncContext(new HttpEventWrapper(), null, threadPoolExecutor);
            processorWrapper.threadPoolExecutor.execute(handlerSpecific);
        } catch (Exception e) {
            this.httpServerLogger.error(e.getMessage(), e);
            sendResponse(channelHandlerContext, httpRequest, HttpResponseUtils.createInternalServerError());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse) {
        sendResponse(channelHandlerContext, httpRequest, httpResponse, true);
    }

    private void sendResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, boolean z) {
        ReferenceCountUtil.release(httpRequest);
        channelHandlerContext.writeAndFlush(httpResponse).addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                return;
            }
            this.httpLogger.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(channelFuture.channel()));
            if (z) {
                channelFuture.channel().close();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws IOException {
        HttpEventWrapper httpEventWrapper = new HttpEventWrapper();
        httpEventWrapper.setHttpMethod(httpRequest.method().name());
        httpEventWrapper.setHttpVersion(httpRequest.protocolVersion().protocolName());
        httpEventWrapper.setRequestURI(httpRequest.uri());
        for (String str : httpRequest.headers().names()) {
            httpEventWrapper.getHeaderMap().put(str, httpRequest.headers().get(str));
        }
        long currentTimeMillis = System.currentTimeMillis();
        FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequest;
        HashMap hashMap = new HashMap();
        if (HttpMethod.GET == fullHttpRequest.method()) {
            new QueryStringDecoder(fullHttpRequest.uri()).parameters().forEach((str2, list) -> {
                hashMap.put(str2, list.get(0));
            });
        } else {
            if (HttpMethod.POST != fullHttpRequest.method()) {
                throw new RuntimeException("UnSupported Method " + fullHttpRequest.method());
            }
            if (StringUtils.contains(httpRequest.headers().get("Content-Type"), ContentType.APPLICATION_JSON.getMimeType())) {
                int readableBytes = fullHttpRequest.content().readableBytes();
                if (readableBytes > 0) {
                    byte[] bArr = new byte[readableBytes];
                    fullHttpRequest.content().readBytes(bArr);
                    Map map = (Map) JsonUtils.deserialize(new String(bArr), new TypeReference<Map<String, Object>>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService.1
                    });
                    hashMap.getClass();
                    map.forEach((v1, v2) -> {
                        r1.put(v1, v2);
                    });
                }
            } else {
                HttpPostRequestDecoder httpPostRequestDecoder = new HttpPostRequestDecoder(this.defaultHttpDataFactory, httpRequest);
                for (Attribute attribute : httpPostRequestDecoder.getBodyHttpDatas()) {
                    if (attribute.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                        Attribute attribute2 = attribute;
                        hashMap.put(attribute2.getName(), attribute2.getValue());
                    }
                }
                httpPostRequestDecoder.destroy();
            }
        }
        httpEventWrapper.setBody(JsonUtils.serialize(hashMap).getBytes(StandardCharsets.UTF_8));
        this.metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - currentTimeMillis);
        return httpEventWrapper;
    }

    public void setMetrics(HTTPMetricsServer hTTPMetricsServer) {
        this.metrics = hTTPMetricsServer;
    }

    public void setHttpTrace(HTTPTrace hTTPTrace) {
        this.httpTrace = hTTPTrace;
    }
}
