package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.trace.Span;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.trace.TraceUtils;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.class */
public class SendAsyncMessageProcessor implements HttpRequestProcessor {
    public Logger messageLogger = LoggerFactory.getLogger("message");
    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    public Logger cmdLogger = LoggerFactory.getLogger("cmd");
    public Logger aclLogger = LoggerFactory.getLogger("acl");
    private EventMeshHTTPServer eventMeshHTTPServer;

    public SendAsyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(final ChannelHandlerContext channelHandlerContext, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), IPUtils.getLocalAddress()});
        SendMessageRequestHeader header = asyncContext.getRequest().getHeader();
        final SendMessageResponseHeader buildHeader = SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, IPUtils.getLocalAddress(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        String protocolType = header.getProtocolType();
        final String protocolVersion = header.getProtocolVersion();
        CloudEvent cloudEvent = ProtocolPluginFactory.getProtocolAdaptor(protocolType).toCloudEvent(asyncContext.getRequest());
        final Span prepareServerSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", true);
        if (cloudEvent == null || StringUtils.isBlank(cloudEvent.getId()) || cloudEvent.getSource() == null || cloudEvent.getSpecVersion() == null || StringUtils.isBlank(cloudEvent.getType()) || StringUtils.isBlank(cloudEvent.getSubject())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        String obj = Objects.requireNonNull(cloudEvent.getExtension("idc")).toString();
        String obj2 = Objects.requireNonNull(cloudEvent.getExtension("pid")).toString();
        String obj3 = Objects.requireNonNull(cloudEvent.getExtension("sys")).toString();
        if (StringUtils.isBlank(obj) || StringUtils.isBlank(obj2) || !StringUtils.isNumeric(obj2) || StringUtils.isBlank(obj3)) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        final String obj4 = Objects.requireNonNull(cloudEvent.getExtension("bizseqno")).toString();
        final String obj5 = Objects.requireNonNull(cloudEvent.getExtension("uniqueid")).toString();
        String obj6 = Objects.requireNonNull(cloudEvent.getExtension("producergroup")).toString();
        final String subject = cloudEvent.getSubject();
        if (StringUtils.isBlank(obj4) || StringUtils.isBlank(obj5) || StringUtils.isBlank(obj6) || StringUtils.isBlank(subject) || cloudEvent.getData() == null) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        if (this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
            try {
                Acl.doAclCheckInHttpSend(RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), cloudEvent.getExtension("username").toString(), cloudEvent.getExtension("passwd").toString(), cloudEvent.getExtension("sys").toString(), subject, Integer.parseInt(asyncContext.getRequest().getRequestCode()));
            } catch (Exception e) {
                asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage())));
                this.aclLogger.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
                TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_ACL_ERR.getErrMsg(), (Throwable) null);
                return;
            }
        }
        if (!this.eventMeshHTTPServer.getMsgRateLimiter().tryAcquire(100L, TimeUnit.MILLISECONDS)) {
            HttpCommand createHttpCommandResponse = asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
            this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPDiscard();
            asyncContext.onComplete(createHttpCommandResponse);
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(obj6);
        if (!eventMeshProducer.getStarted().get()) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg())));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        String valueOf = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
        if (StringUtils.isBlank(cloudEvent.getExtension("ttl").toString()) && !StringUtils.isNumeric(cloudEvent.getExtension("ttl").toString())) {
            cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension("ttl", valueOf).build();
        }
        if (new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8).length() > this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
            this.httpLogger.error("Event size exceeds the limit: {}", Integer.valueOf(this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR.getRetCode(), "Event size exceeds the limit: " + this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR.getErrMsg(), (Throwable) null);
            return;
        }
        try {
            cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension("msgtype", "persistent").withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, Long.valueOf(asyncContext.getRequest().reqTime)).withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerIp).build();
            if (this.messageLogger.isDebugEnabled()) {
                this.messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", obj4, subject);
            }
            final SendMessageContext sendMessageContext = new SendMessageContext(obj4, cloudEvent, eventMeshProducer, this.eventMeshHTTPServer);
            this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsg();
            final long currentTimeMillis = System.currentTimeMillis();
            final CompleteHandler<HttpCommand> completeHandler = new CompleteHandler<HttpCommand>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor.1
                @Override // org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler
                public void onResponse(HttpCommand httpCommand) {
                    try {
                        if (SendAsyncMessageProcessor.this.httpLogger.isDebugEnabled()) {
                            SendAsyncMessageProcessor.this.httpLogger.debug("{}", httpCommand);
                        }
                        SendAsyncMessageProcessor.this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                        SendAsyncMessageProcessor.this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - ((HttpCommand) asyncContext.getRequest()).getReqTime());
                    } catch (Exception e2) {
                    }
                }
            };
            try {
                cloudEvent = CloudEventBuilder.from(sendMessageContext.getEvent()).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
                sendMessageContext.setEvent(cloudEvent);
                Span prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-client-span", false);
                try {
                    eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor.2
                        public void onSuccess(SendResult sendResult) {
                            asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString())), completeHandler);
                            long currentTimeMillis2 = System.currentTimeMillis();
                            SendAsyncMessageProcessor.this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                            SendAsyncMessageProcessor.this.messageLogger.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject, obj4, obj5});
                            TraceUtils.finishSpan(prepareServerSpan, sendMessageContext.getEvent());
                        }

                        public void onException(OnExceptionContext onExceptionContext) {
                            asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(onExceptionContext.getException(), 2))), completeHandler);
                            SendAsyncMessageProcessor.this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000L));
                            long currentTimeMillis2 = System.currentTimeMillis();
                            SendAsyncMessageProcessor.this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgFailed();
                            SendAsyncMessageProcessor.this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                            SendAsyncMessageProcessor.this.messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject, obj4, obj5, onExceptionContext.getException()});
                            TraceUtils.finishSpanWithException(prepareServerSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersion, sendMessageContext.getEvent()), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), (Throwable) onExceptionContext.getException());
                        }
                    });
                    TraceUtils.finishSpan(prepareClientSpan, cloudEvent);
                } catch (Throwable th) {
                    TraceUtils.finishSpan(prepareClientSpan, cloudEvent);
                    throw th;
                }
            } catch (Exception e2) {
                asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2))));
                TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), (Throwable) null);
                this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000L));
                long currentTimeMillis2 = System.currentTimeMillis();
                this.messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject, obj4, obj5, e2});
                this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgFailed();
                this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
            }
        } catch (Exception e3) {
            this.messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", new Object[]{obj4, subject, e3});
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e3, 2))));
            TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), "upstream-eventmesh-server-span", false), EventMeshUtil.getCloudEventExtensionMap(protocolVersion, cloudEvent), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg(), (Throwable) null);
        }
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public boolean rejectRequest() {
        return false;
    }
}
