package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.ReplyMessageResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.class */
public class ReplyMessageProcessor implements HttpRequestProcessor {
    public final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
    public final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
    public final Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    private final EventMeshHTTPServer eventMeshHTTPServer;

    public ReplyMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        String localAddress = IPUtils.getLocalAddress();
        final HttpCommand request = asyncContext.getRequest();
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), localAddress});
        CloudEvent cloudEvent = ProtocolPluginFactory.getProtocolAdaptor(request.getHeader().getProtocolType()).toCloudEvent(request);
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
        final ReplyMessageResponseHeader buildHeader = ReplyMessageResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()), eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), eventMeshHttpConfiguration.getEventMeshIDC());
        if (!ObjectUtils.allNotNull(new Object[]{cloudEvent, cloudEvent.getSource(), cloudEvent.getSpecVersion()}) || StringUtils.isAnyBlank(new CharSequence[]{cloudEvent.getId(), cloudEvent.getType(), cloudEvent.getSubject()})) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        String extension = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.IDC.getKey());
        String extension2 = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PID.getKey());
        if (StringUtils.isAnyBlank(new CharSequence[]{extension, extension2, getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey())}) || !StringUtils.isNumeric(extension2)) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        final String extension3 = getExtension(cloudEvent, "bizseqno");
        final String extension4 = getExtension(cloudEvent, "uniqueid");
        String extension5 = getExtension(cloudEvent, "producergroup");
        if (StringUtils.isAnyBlank(new CharSequence[]{extension3, extension4, extension5}) || cloudEvent.getData() == null) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        final HttpSummaryMetrics summaryMetrics = this.eventMeshHTTPServer.getMetrics().getSummaryMetrics();
        if (!this.eventMeshHTTPServer.getMsgRateLimiter().tryAcquire(100L, TimeUnit.MILLISECONDS)) {
            summaryMetrics.recordHTTPDiscard();
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        if ((cloudEvent.getData() == null ? "" : new String(cloudEvent.getData().toBytes(), Constants.DEFAULT_CHARSET)).length() > eventMeshHttpConfiguration.getEventMeshEventSize()) {
            this.httpLogger.error("Event size exceeds the limit: {}", Integer.valueOf(eventMeshHttpConfiguration.getEventMeshEventSize()));
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, "Event size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventSize(), ReplyMessageResponseBody.class);
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(extension5);
        if (!eventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        final long currentTimeMillis = System.currentTimeMillis();
        final String subject = cloudEvent.getSubject();
        final String extension6 = getExtension(cloudEvent, EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
        if (StringUtils.isEmpty(extension6)) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR, null, ReplyMessageResponseBody.class);
            return;
        }
        String str = extension6 + "-" + EventMeshConstants.RR_REPLY_TOPIC;
        try {
            CloudEvent build = CloudEventBuilder.from(cloudEvent).withSubject(str).withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT).withExtension("timeout", String.valueOf(3000)).withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
            LogUtils.debug(this.messageLogger, "msg2MQMsg suc, bizSeqNo={}, topic={}", extension3, str);
            SendMessageContext sendMessageContext = new SendMessageContext(extension3, build, eventMeshProducer, this.eventMeshHTTPServer);
            summaryMetrics.recordReplyMsg();
            final CompleteHandler completeHandler = httpCommand -> {
                try {
                    LogUtils.debug(this.httpLogger, "{}", httpCommand);
                    this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                    summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
                } catch (Exception e) {
                }
            };
            try {
                sendMessageContext.setEvent(CloudEventBuilder.from(sendMessageContext.getEvent()).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build());
                eventMeshProducer.reply(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor.1
                    public void onSuccess(SendResult sendResult) {
                        asyncContext.onComplete(request.createHttpCommandResponse(buildHeader, ReplyMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg())), completeHandler);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        summaryMetrics.recordReplyMsgCost(currentTimeMillis2 - currentTimeMillis);
                        ReplyMessageProcessor.this.messageLogger.info("message|eventMesh2mq|RSP|SYNC|reply2MQCost={}|topic={}|origTopic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), extension6 + "-" + EventMeshConstants.RR_REPLY_TOPIC, subject, extension3, extension4});
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        asyncContext.onComplete(request.createHttpCommandResponse(buildHeader, ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(onExceptionContext.getException(), 2))), completeHandler);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        summaryMetrics.recordReplyMsgFailed();
                        summaryMetrics.recordReplyMsgCost(currentTimeMillis2 - currentTimeMillis);
                        ReplyMessageProcessor.this.messageLogger.error("message|eventMesh2mq|RSP|SYNC|reply2MQCost={}|topic={}|origTopic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), extension6 + "-" + EventMeshConstants.RR_REPLY_TOPIC, subject, extension3, extension4, onExceptionContext.getException()});
                    }
                });
            } catch (Exception e) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR, EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), ReplyMessageResponseBody.class);
                long currentTimeMillis2 = System.currentTimeMillis();
                this.messageLogger.error("message|eventMesh2mq|RSP|SYNC|reply2MQCost={}|topic={}|origTopic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), str, subject, extension3, extension4, e});
                summaryMetrics.recordReplyMsgFailed();
                summaryMetrics.recordReplyMsgCost(currentTimeMillis2 - currentTimeMillis);
            }
        } catch (Exception e2) {
            this.messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", new Object[]{extension3, str, e2});
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2), ReplyMessageResponseBody.class);
        }
    }
}
