package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.LiteMessage;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.OMSUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.class */
public class SendSyncMessageProcessor implements HttpRequestProcessor {
    public Logger messageLogger = LoggerFactory.getLogger("message");
    public Logger cmdLogger = LoggerFactory.getLogger("cmd");
    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    private EventMeshHTTPServer eventMeshHTTPServer;

    public SendSyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(final ChannelHandlerContext channelHandlerContext, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), IPUtil.getLocalAddress()});
        SendMessageRequestHeader header = asyncContext.getRequest().getHeader();
        final SendMessageRequestBody body = asyncContext.getRequest().getBody();
        final SendMessageResponseHeader buildHeader = SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, IPUtil.getLocalAddress(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        if (StringUtils.isBlank(header.getIdc()) || StringUtils.isBlank(header.getPid()) || !StringUtils.isNumeric(header.getPid()) || StringUtils.isBlank(header.getSys())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())));
            return;
        }
        if (StringUtils.isBlank(body.getBizSeqNo()) || StringUtils.isBlank(body.getUniqueId()) || StringUtils.isBlank(body.getProducerGroup()) || StringUtils.isBlank(body.getTopic()) || StringUtils.isBlank(body.getContent()) || StringUtils.isBlank(body.getTtl())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())));
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(body.getProducerGroup());
        if (!eventMeshProducer.getStarted().get()) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg())));
            return;
        }
        String valueOf = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
        if (StringUtils.isNotBlank(body.getTtl()) && StringUtils.isNumeric(body.getTtl())) {
            valueOf = body.getTtl();
        }
        Message message = new Message();
        try {
            message.setBody(body.getContent().getBytes("UTF-8"));
            message.setTopic(body.getTopic());
            message.putSystemProperties("DESTINATION", body.getTopic());
            if (!StringUtils.isBlank(body.getTag())) {
                message.putUserProperties("Tag", body.getTag());
            }
            message.putSystemProperties("TIMEOUT", valueOf);
            message.putSystemProperties("SEARCH_KEYS", body.getBizSeqNo());
            message.putUserProperties("msgType", "persistent");
            message.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
            message.putUserProperties("RMB_UNIQ_ID", body.getUniqueId());
            if (this.messageLogger.isDebugEnabled()) {
                this.messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", body.getBizSeqNo(), body.getTopic());
            }
            message.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
            SendMessageContext sendMessageContext = new SendMessageContext(body.getBizSeqNo(), message, eventMeshProducer, this.eventMeshHTTPServer);
            this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsg();
            final long currentTimeMillis = System.currentTimeMillis();
            final CompleteHandler<HttpCommand> completeHandler = new CompleteHandler<HttpCommand>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor.1
                @Override // org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler
                public void onResponse(HttpCommand httpCommand) {
                    try {
                        if (SendSyncMessageProcessor.this.httpLogger.isDebugEnabled()) {
                            SendSyncMessageProcessor.this.httpLogger.debug("{}", httpCommand);
                        }
                        SendSyncMessageProcessor.this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                        SendSyncMessageProcessor.this.eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - ((HttpCommand) asyncContext.getRequest()).getReqTime());
                    } catch (Exception e) {
                    }
                }
            };
            new LiteMessage(body.getBizSeqNo(), body.getUniqueId(), body.getTopic(), body.getContent()).setProp(body.getExtFields());
            try {
                eventMeshProducer.request(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor.2
                    public void onSuccess(SendResult sendResult) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        SendSyncMessageProcessor.this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                        SendSyncMessageProcessor.this.messageLogger.info("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), body.getTopic(), body.getBizSeqNo(), body.getUniqueId()});
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(onExceptionContext.getException(), 2))), completeHandler);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        SendSyncMessageProcessor.this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
                        SendSyncMessageProcessor.this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                        SendSyncMessageProcessor.this.messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), body.getTopic(), body.getBizSeqNo(), body.getUniqueId(), onExceptionContext.getException()});
                    }
                }, new RRCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor.3
                    public void onSuccess(Message message2) {
                        message2.getUserProperties().put("BORN_TIMESTAMP", message2.getSystemProperties("BORN_TIMESTAMP"));
                        message2.getUserProperties().put("STORE_TIME", message2.getSystemProperties("STORE_TIMESTAMP"));
                        message2.getUserProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                        SendSyncMessageProcessor.this.messageLogger.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), body.getTopic(), body.getBizSeqNo(), body.getUniqueId()});
                        try {
                            String str = new String(message2.getBody(), "UTF-8");
                            message2.getUserProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                            asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), JSON.toJSONString(new SendMessageResponseBody.ReplyMessage(message2.getSystemProperties("DESTINATION"), str, OMSUtil.combineProp(message2.getSystemProperties(), message2.getUserProperties()))))), completeHandler);
                        } catch (Exception e) {
                            asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))), completeHandler);
                            SendSyncMessageProcessor.this.messageLogger.warn("message|mq2eventMesh|RSP", e);
                        }
                    }

                    public void onException(Throwable th) {
                        asyncContext.onComplete(((HttpCommand) asyncContext.getRequest()).createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(th, 2))), completeHandler);
                        SendSyncMessageProcessor.this.messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), body.getTopic(), body.getBizSeqNo(), body.getUniqueId(), th});
                    }
                }, Integer.valueOf(valueOf).intValue());
            } catch (Exception e) {
                asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))));
                long currentTimeMillis2 = System.currentTimeMillis();
                this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
                this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                this.messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), body.getTopic(), body.getBizSeqNo(), body.getUniqueId(), e});
            }
        } catch (Exception e2) {
            this.messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", new Object[]{body.getBizSeqNo(), body.getTopic(), e2});
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2))));
        }
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public boolean rejectRequest() {
        return false;
    }
}
