package org.apache.eventmesh.runtime.core.protocol.grpc.processor;

import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEventBatch;
import org.apache.eventmesh.common.protocol.grpc.common.BatchEventMeshCloudEventWrapper;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.class */
public class BatchPublishCloudEventProcessor extends AbstractPublishBatchCloudEventProcessor {
    private static final Logger log = LoggerFactory.getLogger(BatchPublishCloudEventProcessor.class);

    public BatchPublishCloudEventProcessor(EventMeshGrpcServer eventMeshGrpcServer) {
        super(eventMeshGrpcServer, eventMeshGrpcServer.getAcl());
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.grpc.processor.AbstractPublishBatchCloudEventProcessor
    public void handleCloudEvent(CloudEventBatch cloudEventBatch, EventEmitter<CloudEvent> eventEmitter) throws Exception {
        CloudEvent events = cloudEventBatch.getEvents(0);
        final String subject = EventMeshCloudEventUtils.getSubject(events);
        String producerGroup = EventMeshCloudEventUtils.getProducerGroup(events);
        for (io.cloudevents.CloudEvent cloudEvent : ProtocolPluginFactory.getProtocolAdaptor(EventMeshCloudEventUtils.getProtocolType(events)).toBatchCloudEvent(new BatchEventMeshCloudEventWrapper(cloudEventBatch))) {
            final String id = cloudEvent.getId();
            final String obj = cloudEvent.getExtension("uniqueid") == null ? "" : cloudEvent.getExtension("uniqueid").toString();
            EventMeshProducer eventMeshProducer = this.eventMeshGrpcServer.getProducerManager().getEventMeshProducer(producerGroup);
            SendMessageContext sendMessageContext = new SendMessageContext(id, cloudEvent, eventMeshProducer, this.eventMeshGrpcServer);
            this.eventMeshGrpcServer.getMetricsMonitor().recordSendMsgToQueue();
            final long currentTimeMillis = System.currentTimeMillis();
            eventMeshProducer.send(sendMessageContext, new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.grpc.processor.BatchPublishCloudEventProcessor.1
                public void onSuccess(SendResult sendResult) {
                    BatchPublishCloudEventProcessor.log.info("message|eventMesh2mq|REQ|BatchSend|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), subject, id, obj});
                }

                public void onException(OnExceptionContext onExceptionContext) {
                    BatchPublishCloudEventProcessor.log.error("message|eventMesh2mq|REQ|BatchSend|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), subject, id, obj, onExceptionContext.getException()});
                }
            });
        }
        ServiceUtils.sendResponseCompleted(StatusCode.SUCCESS, "batch publish success", eventEmitter);
    }
}
