package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.google.common.base.Stopwatch;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.class */
public class BatchSendMessageProcessor implements HttpRequestProcessor {
    private final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
    private final Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
    private final Logger batchMessageLogger = LoggerFactory.getLogger("batchMessage");
    private final EventMeshHTTPServer eventMeshHTTPServer;
    private final Acl acl;

    public BatchSendMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
        this.acl = eventMeshHTTPServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) throws Exception {
        String localAddress = IPUtils.getLocalAddress();
        HttpCommand request = asyncContext.getRequest();
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), localAddress});
        SendMessageBatchRequestHeader header = request.getHeader();
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SendMessageBatchResponseHeader buildHeader = SendMessageBatchResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()), eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), eventMeshHttpConfiguration.getEventMeshIDC());
        List<CloudEvent> batchCloudEvent = ProtocolPluginFactory.getProtocolAdaptor(header.getProtocolType()).toBatchCloudEvent(request);
        if (CollectionUtils.isEmpty(batchCloudEvent)) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }
        String str = "";
        String str2 = "";
        int size = batchCloudEvent.size();
        if (size > eventMeshHttpConfiguration.getEventMeshEventBatchSize()) {
            this.batchMessageLogger.error("Event batch size exceeds the limit: {}", Integer.valueOf(eventMeshHttpConfiguration.getEventMeshEventBatchSize()));
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, "Event batch size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventBatchSize(), SendMessageBatchResponseBody.class);
            return;
        }
        for (CloudEvent cloudEvent : batchCloudEvent) {
            if (!ObjectUtils.allNotNull(new Object[]{cloudEvent.getSource(), cloudEvent.getSpecVersion()}) || StringUtils.isAnyBlank(new CharSequence[]{cloudEvent.getId(), cloudEvent.getType(), cloudEvent.getSubject()})) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }
            if ((cloudEvent.getData() == null ? "" : new String(cloudEvent.getData().toBytes(), Constants.DEFAULT_CHARSET)).length() > eventMeshHttpConfiguration.getEventMeshEventSize()) {
                this.batchMessageLogger.error("Event size exceeds the limit: {}", Integer.valueOf(eventMeshHttpConfiguration.getEventMeshEventSize()));
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, "Event size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventSize(), SendMessageBatchResponseBody.class);
                return;
            }
            String extension = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.IDC.getKey());
            String extension2 = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PID.getKey());
            if (StringUtils.isAnyBlank(new CharSequence[]{extension, extension2, getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey())}) || !StringUtils.isNumeric(extension2)) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }
            str = getExtension(cloudEvent, "batchId");
            str2 = getExtension(cloudEvent, EventMeshConstants.PRODUCER_GROUP);
            size = Integer.parseInt(getExtension(cloudEvent, "size"));
            if (cloudEvent.getData() == null || StringUtils.isAnyBlank(new CharSequence[]{str, str2}) || size != batchCloudEvent.size()) {
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }
        }
        HttpSummaryMetrics summaryMetrics = this.eventMeshHTTPServer.getMetrics().getSummaryMetrics();
        if (!this.eventMeshHTTPServer.getBatchRateLimiter().tryAcquire(size, 100L, TimeUnit.MILLISECONDS)) {
            summaryMetrics.recordSendBatchMsgDiscard(size);
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(str2);
        eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
        if (!eventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
        int parseInt = Integer.parseInt(request.getRequestCode());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (CloudEvent cloudEvent2 : batchCloudEvent) {
            if (!StringUtils.isBlank(cloudEvent2.getSubject()) && cloudEvent2.getData() != null) {
                String extension3 = getExtension(cloudEvent2, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
                String extension4 = getExtension(cloudEvent2, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
                String extension5 = getExtension(cloudEvent2, ProtocolKey.ClientInstanceKey.SYS.getKey());
                if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
                    try {
                        this.acl.doAclCheckInHttpSend(parseChannelRemoteAddr, extension3, extension4, extension5, cloudEvent2.getSubject(), parseInt);
                    } catch (Exception e) {
                        completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchResponseBody.class);
                        this.aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageProcessor send failed", e);
                        return;
                    }
                }
                try {
                    String extension6 = getExtension(cloudEvent2, "ttl");
                    if (StringUtils.isBlank(extension6) || !StringUtils.isNumeric(extension6)) {
                        cloudEvent2 = CloudEventBuilder.from(cloudEvent2).withExtension("ttl", String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS)).withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT).build();
                    }
                    if (concurrentHashMap.containsKey(cloudEvent2.getSubject())) {
                        ((List) concurrentHashMap.get(cloudEvent2.getSubject())).add(cloudEvent2);
                    } else {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(cloudEvent2);
                        concurrentHashMap.put(cloudEvent2.getSubject(), arrayList);
                    }
                    LogUtils.debug(this.batchMessageLogger, "msg2MQMsg suc, event:{}", cloudEvent2.getData());
                } catch (Exception e2) {
                    this.batchMessageLogger.error("msg2MQMsg err, event:{}", cloudEvent2.getData(), e2);
                }
            }
        }
        if (CollectionUtils.isEmpty(batchCloudEvent)) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }
        summaryMetrics.recordSendBatchMsg(size);
        if (eventMeshHttpConfiguration.isEventMeshServerBatchMsgBatchEnabled()) {
            for (List list : concurrentHashMap.values()) {
                final SendMessageContext sendMessageContext = new SendMessageContext(str, null, eventMeshProducer, this.eventMeshHTTPServer);
                eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor.1
                    public void onSuccess(SendResult sendResult) {
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        BatchSendMessageProcessor.this.batchMessageLogger.warn("", onExceptionContext.getException());
                        BatchSendMessageProcessor.this.eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10L, TimeUnit.SECONDS);
                    }
                });
            }
        } else {
            Iterator it = batchCloudEvent.iterator();
            while (it.hasNext()) {
                final SendMessageContext sendMessageContext2 = new SendMessageContext(str, (CloudEvent) it.next(), eventMeshProducer, this.eventMeshHTTPServer);
                eventMeshProducer.send(sendMessageContext2, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor.2
                    public void onSuccess(SendResult sendResult) {
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        BatchSendMessageProcessor.this.batchMessageLogger.warn("", onExceptionContext.getException());
                        BatchSendMessageProcessor.this.eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext2, 10L, TimeUnit.SECONDS);
                    }
                });
            }
        }
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        summaryMetrics.recordBatchSendMsgCost(elapsed);
        LogUtils.debug(this.batchMessageLogger, "batchMessage|eventMesh2mq|REQ|ASYNC|batchId={}|send2MQCost={}ms|msgNum={}|topics={}", new Object[]{str, Long.valueOf(elapsed), Integer.valueOf(size), concurrentHashMap.keySet()});
        completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.SUCCESS, null, SendMessageBatchResponseBody.class);
    }
}
