package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchResponseHeader;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.class */
public class BatchSendMessageProcessor implements HttpRequestProcessor {
    private EventMeshHTTPServer eventMeshHTTPServer;
    public Logger cmdLogger = LoggerFactory.getLogger("cmd");
    public Logger batchMessageLogger = LoggerFactory.getLogger("batchMessage");

    public BatchSendMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) throws Exception {
        this.cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), IPUtil.getLocalAddress()});
        SendMessageBatchRequestHeader header = asyncContext.getRequest().getHeader();
        SendMessageBatchRequestBody body = asyncContext.getRequest().getBody();
        SendMessageBatchResponseHeader buildHeader = SendMessageBatchResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, IPUtil.getLocalAddress(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        if (StringUtils.isBlank(header.getPid()) || !StringUtils.isNumeric(header.getPid()) || StringUtils.isBlank(header.getSys())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())));
            return;
        }
        if (CollectionUtils.isEmpty(body.getContents()) || StringUtils.isBlank(body.getBatchId()) || StringUtils.isBlank(body.getProducerGroup()) || Integer.valueOf(body.getSize()).intValue() != CollectionUtils.size(body.getContents())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())));
            return;
        }
        if (!this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter.tryAcquire(Integer.valueOf(body.getSize()).intValue(), 100L, TimeUnit.MILLISECONDS)) {
            HttpCommand createHttpCommandResponse = asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getErrMsg()));
            this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendBatchMsgDiscard(Integer.valueOf(body.getSize()).intValue());
            asyncContext.onComplete(createHttpCommandResponse);
            return;
        }
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(body.getProducerGroup());
        eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
        if (!eventMeshProducer.getStarted().get()) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getErrMsg())));
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (SendMessageBatchRequestBody.BatchMessageEntity batchMessageEntity : body.getContents()) {
            if (!StringUtils.isBlank(batchMessageEntity.topic) && !StringUtils.isBlank(batchMessageEntity.msg)) {
                if (StringUtils.isBlank(batchMessageEntity.ttl) || !StringUtils.isNumeric(batchMessageEntity.ttl)) {
                    batchMessageEntity.ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
                }
                try {
                    Message message = new Message();
                    message.setTopic(batchMessageEntity.topic);
                    message.setBody(batchMessageEntity.msg.getBytes("UTF-8"));
                    if (!StringUtils.isBlank(batchMessageEntity.tag)) {
                        message.putUserProperties(EventMeshConstants.TAG, batchMessageEntity.tag);
                    }
                    message.putUserProperties("msgType", "persistent");
                    message.putSystemProperties("TIMEOUT", batchMessageEntity.ttl);
                    arrayList.add(message);
                    if (concurrentHashMap.containsKey(batchMessageEntity.topic)) {
                        ((List) concurrentHashMap.get(batchMessageEntity.topic)).add(message);
                    } else {
                        ArrayList arrayList2 = new ArrayList();
                        arrayList2.add(message);
                        concurrentHashMap.put(batchMessageEntity.topic, arrayList2);
                    }
                    if (this.batchMessageLogger.isDebugEnabled()) {
                        this.batchMessageLogger.debug("msg2MQMsg suc, msg:{}", batchMessageEntity.msg);
                    }
                } catch (Exception e) {
                    this.batchMessageLogger.error("msg2MQMsg err, msg:{}", batchMessageEntity, e);
                }
            }
        }
        if (CollectionUtils.isEmpty(arrayList)) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())));
            return;
        }
        this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendBatchMsg(StringUtils.isNumeric(body.getSize()) ? Integer.parseInt(r0) : 0L);
        if (this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgBatchEnabled) {
            for (List<Message> list : concurrentHashMap.values()) {
                final SendMessageContext sendMessageContext = new SendMessageContext(body.getBatchId(), new Message(), eventMeshProducer, this.eventMeshHTTPServer);
                sendMessageContext.setMessageList(list);
                eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor.1
                    public void onSuccess(SendResult sendResult) {
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        BatchSendMessageProcessor.this.batchMessageLogger.warn("", onExceptionContext.getException());
                        BatchSendMessageProcessor.this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000L));
                    }
                });
            }
        } else {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                final SendMessageContext sendMessageContext2 = new SendMessageContext(body.getBatchId(), (Message) it.next(), eventMeshProducer, this.eventMeshHTTPServer);
                eventMeshProducer.send(sendMessageContext2, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor.2
                    public void onSuccess(SendResult sendResult) {
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        BatchSendMessageProcessor.this.batchMessageLogger.warn("", onExceptionContext.getException());
                        BatchSendMessageProcessor.this.eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext2.delay(10000L));
                    }
                });
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        this.eventMeshHTTPServer.metrics.summaryMetrics.recordBatchSendMsgCost(currentTimeMillis2 - currentTimeMillis);
        this.batchMessageLogger.debug("batchMessage|eventMesh2mq|REQ|ASYNC|batchId={}|send2MQCost={}ms|msgNum={}|topics={}", new Object[]{body.getBatchId(), Long.valueOf(currentTimeMillis2 - currentTimeMillis), body.getSize(), concurrentHashMap.keySet()});
        asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, SendMessageBatchResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg())));
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public boolean rejectRequest() {
        return false;
    }
}
