package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2ResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2ResponseHeader;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.class */
public class BatchSendMessageV2Processor implements HttpRequestProcessor {
    private final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
    private final Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
    private final Logger batchMessageLogger = LoggerFactory.getLogger("batchMessage");
    private final EventMeshHTTPServer eventMeshHTTPServer;
    private final Acl acl;

    public BatchSendMessageV2Processor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
        this.acl = eventMeshHTTPServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) throws Exception {
        HttpCommand request = asyncContext.getRequest();
        Integer valueOf = Integer.valueOf(request.getRequestCode());
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(valueOf), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel())});
        CloudEvent cloudEvent = ProtocolPluginFactory.getProtocolAdaptor(request.getHeader().getProtocolType()).toCloudEvent(request);
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SendMessageBatchV2ResponseHeader buildHeader = SendMessageBatchV2ResponseHeader.buildHeader(valueOf, eventMeshHttpConfiguration.getEventMeshCluster(), eventMeshHttpConfiguration.getEventMeshEnv(), eventMeshHttpConfiguration.getEventMeshIDC());
        if (!ObjectUtils.allNotNull(new Object[]{cloudEvent.getSource(), cloudEvent.getSpecVersion()}) || StringUtils.isAnyBlank(new CharSequence[]{cloudEvent.getId(), cloudEvent.getType(), cloudEvent.getSubject()})) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }
        String extension = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.IDC.getKey());
        String extension2 = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PID.getKey());
        if (StringUtils.isAnyBlank(new CharSequence[]{extension, extension2, getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey())}) || !StringUtils.isNumeric(extension2)) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }
        final String extension3 = getExtension(cloudEvent, "bizseqno");
        String extension4 = getExtension(cloudEvent, "producergroup");
        final String subject = cloudEvent.getSubject();
        if (StringUtils.isAnyBlank(new CharSequence[]{extension3, subject, extension4}) || cloudEvent.getData() == null) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }
        if (new String(((CloudEventData) Objects.requireNonNull(cloudEvent.getData())).toBytes(), Constants.DEFAULT_CHARSET).length() > eventMeshHttpConfiguration.getEventMeshEventSize()) {
            this.batchMessageLogger.error("Event size exceeds the limit: {}", Integer.valueOf(eventMeshHttpConfiguration.getEventMeshEventSize()));
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, "Event size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventSize(), SendMessageBatchV2ResponseBody.class);
            return;
        }
        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
            try {
                this.acl.doAclCheckInHttpSend(RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.USERNAME.getKey()), getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PASSWD.getKey()), getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey()), subject, valueOf);
            } catch (Exception e) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchV2ResponseBody.class);
                this.aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
                return;
            }
        }
        final HttpSummaryMetrics summaryMetrics = this.eventMeshHTTPServer.getMetrics().getSummaryMetrics();
        if (!this.eventMeshHTTPServer.getBatchRateLimiter().tryAcquire(100L, TimeUnit.MILLISECONDS)) {
            summaryMetrics.recordSendBatchMsgDiscard(1L);
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(extension4);
        eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
        if (!eventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }
        final long currentTimeMillis = System.currentTimeMillis();
        String valueOf2 = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
        String extension5 = getExtension(cloudEvent, "ttl");
        if (StringUtils.isBlank(extension5) && !StringUtils.isNumeric(extension5)) {
            cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension("ttl", valueOf2).build();
        }
        try {
            cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT).withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
            LogUtils.debug(this.batchMessageLogger, "msg2MQMsg suc, topic:{}, msg:{}", subject, cloudEvent.getData());
            summaryMetrics.recordSendBatchMsg(1L);
            final SendMessageContext sendMessageContext = new SendMessageContext(extension3, cloudEvent, eventMeshProducer, this.eventMeshHTTPServer);
            try {
                eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor.1
                    public void onSuccess(SendResult sendResult) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        summaryMetrics.recordBatchSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                        BatchSendMessageV2Processor.this.batchMessageLogger.debug("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}", new Object[]{extension3, Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject});
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        BatchSendMessageV2Processor.this.eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10L, TimeUnit.SECONDS);
                        summaryMetrics.recordBatchSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                        BatchSendMessageV2Processor.this.batchMessageLogger.error("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}", new Object[]{extension3, Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject, onExceptionContext.getException()});
                    }
                });
            } catch (Exception e2) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR, EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2), SendMessageBatchV2ResponseBody.class);
                long currentTimeMillis2 = System.currentTimeMillis();
                this.eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10L, TimeUnit.SECONDS);
                summaryMetrics.recordBatchSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                this.batchMessageLogger.error("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}", new Object[]{extension3, Long.valueOf(currentTimeMillis2 - currentTimeMillis), subject, e2});
            }
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.SUCCESS, null, SendMessageBatchV2ResponseBody.class);
        } catch (Exception e3) {
            this.batchMessageLogger.error("msg2MQMsg err, topic:{}, msg:{}", new Object[]{subject, cloudEvent.getData(), e3});
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e3, 2), SendMessageBatchV2ResponseBody.class);
        }
    }
}
