package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.fasterxml.jackson.core.type.TypeReference;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EventMeshTrace(isEnable = true)
/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.class */
public class SendAsyncRemoteEventProcessor implements AsyncHttpProcessor {
    public Logger messageLogger = LoggerFactory.getLogger("message");
    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    public Logger aclLogger = LoggerFactory.getLogger("acl");
    private EventMeshHTTPServer eventMeshHTTPServer;

    public SendAsyncRemoteEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.AsyncHttpProcessor
    public void handler(final HandlerService.HandlerSpecific handlerSpecific, HttpRequest httpRequest) throws Exception {
        AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
        ChannelHandlerContext ctx = handlerSpecific.getCtx();
        HttpEventWrapper request = asyncContext.getRequest();
        this.httpLogger.info("uri={}|{}|client2eventMesh|from={}|to={}", new Object[]{request.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress()});
        Map headerMap = request.getHeaderMap();
        String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        String str = this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv + "-" + this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC + "-" + this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster + "-" + this.eventMeshHTTPServer.getEventMeshHttpConfiguration().sysID;
        headerMap.put(EventMeshConstants.MANAGE_IP, parseChannelRemoteAddr);
        headerMap.put("env", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv);
        headerMap.put("idc", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        headerMap.put("sys", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().sysID);
        headerMap.put("producergroup", str);
        request.buildSysHeaderForClient();
        headerMap.putIfAbsent("source", parseChannelRemoteAddr);
        request.buildSysHeaderForCE();
        request.setBody(((Map) JsonUtils.deserialize(new String(request.getBody()), new TypeReference<Map<String, Object>>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncRemoteEventProcessor.1
        })).get("content").toString().getBytes(StandardCharsets.UTF_8));
        final String obj = headerMap.getOrDefault("bizseqno", RandomStringUtils.generateNum(30)).toString();
        final String obj2 = headerMap.getOrDefault("uniqueid", RandomStringUtils.generateNum(30)).toString();
        String obj3 = headerMap.getOrDefault("ttl", 4000).toString();
        request.getSysHeaderMap().putIfAbsent("bizseqno", obj);
        request.getSysHeaderMap().putIfAbsent("uniqueid", obj2);
        request.getSysHeaderMap().putIfAbsent("ttl", obj3);
        final HashMap hashMap = new HashMap();
        hashMap.put("uri", request.getRequestURI());
        hashMap.put("eventmeshcluster", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster);
        hashMap.put("eventmeship", IPUtils.getLocalAddress());
        hashMap.put("eventmeshenv", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv);
        hashMap.put("eventmeshidc", this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        final HashMap hashMap2 = new HashMap();
        Map sysHeaderMap = request.getSysHeaderMap();
        Iterator it = headerMap.entrySet().iterator();
        while (it.hasNext()) {
            if (sysHeaderMap.containsKey((String) ((Map.Entry) it.next()).getKey())) {
                it.remove();
            }
        }
        CloudEvent cloudEvent = ProtocolPluginFactory.getProtocolAdaptor(headerMap.getOrDefault("protocoltype", EventMeshConstants.PROTOCOL_HTTP).toString()).toCloudEvent(request);
        if (cloudEvent == null || StringUtils.isBlank(cloudEvent.getId()) || cloudEvent.getSource() == null || cloudEvent.getSpecVersion() == null || StringUtils.isBlank(cloudEvent.getType()) || StringUtils.isBlank(cloudEvent.getSubject())) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        String obj4 = cloudEvent.getExtension("idc").toString();
        String obj5 = cloudEvent.getExtension("pid").toString();
        String obj6 = cloudEvent.getExtension("sys").toString();
        if (StringUtils.isBlank(obj4) || StringUtils.isBlank(obj5) || !StringUtils.isNumeric(obj5) || StringUtils.isBlank(obj6)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        String obj7 = cloudEvent.getExtension("producergroup").toString();
        final String subject = cloudEvent.getSubject();
        if (StringUtils.isBlank(obj) || StringUtils.isBlank(obj2) || StringUtils.isBlank(obj7) || StringUtils.isBlank(subject) || cloudEvent.getData() == null) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        if (this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
            try {
                Acl.doAclCheckInHttpSend(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cloudEvent.getExtension("username").toString(), cloudEvent.getExtension("passwd").toString(), cloudEvent.getExtension("sys").toString(), subject, request.getRequestURI());
            } catch (Exception e) {
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
                this.aclLogger.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
                return;
            }
        }
        if (!this.eventMeshHTTPServer.getMsgRateLimiter().tryAcquire(100L, TimeUnit.MILLISECONDS)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(obj7);
        if (!eventMeshProducer.getStarted().get()) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        if (new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8).length() > this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
            this.httpLogger.error("Event size exceeds the limit: {}", Integer.valueOf(this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
            return;
        }
        try {
            cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension("msgtype", "persistent").withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
            if (this.messageLogger.isDebugEnabled()) {
                this.messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", obj, subject);
            }
            final SendMessageContext sendMessageContext = new SendMessageContext(obj, cloudEvent, eventMeshProducer, this.eventMeshHTTPServer);
            this.eventMeshHTTPServer.metrics.getSummaryMetrics().recordSendMsg();
            final long currentTimeMillis = System.currentTimeMillis();
            try {
                handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), CloudEventBuilder.from(sendMessageContext.getEvent()).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build()), "upstream-eventmesh-client-span", false);
                eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncRemoteEventProcessor.2
                    public void onSuccess(SendResult sendResult) {
                        hashMap2.put("retCode", EventMeshRetCode.SUCCESS.getRetCode());
                        hashMap2.put("retMsg", EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString());
                        SendAsyncRemoteEventProcessor.this.messageLogger.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), subject, obj, obj2});
                        handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
                        handlerSpecific.sendResponse(hashMap, hashMap2);
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        hashMap2.put("retCode", EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
                        hashMap2.put("retMsg", EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(onExceptionContext.getException(), 2));
                        SendAsyncRemoteEventProcessor.this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000L));
                        handlerSpecific.getTraceOperation().exceptionLatestTrace(onExceptionContext.getException(), EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));
                        handlerSpecific.sendResponse(hashMap, hashMap2);
                        SendAsyncRemoteEventProcessor.this.messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), subject, obj, obj2, onExceptionContext.getException()});
                    }
                });
            } catch (Exception e2) {
                this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000L));
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, hashMap, hashMap2, null);
                this.messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), subject, obj, obj2, e2});
            }
        } catch (Exception e3) {
            this.messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", new Object[]{obj, subject, e3});
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, hashMap, hashMap2, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent));
        }
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.HttpProcessor
    public String[] paths() {
        return new String[]{RequestURI.PUBLISH_BRIDGE.getRequestURI()};
    }
}
