package org.apache.eventmesh.runtime.core.protocol.http.consumer;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.TopicNameHelper;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler;
import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.class */
public class EventMeshConsumer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshConsumer.class);
    private final EventMeshHTTPServer eventMeshHTTPServer;
    private final AtomicBoolean started4Persistent = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean started4Broadcast = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean inited4Persistent = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE.booleanValue());
    public final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
    private ConsumerGroupConf consumerGroupConf;
    private final MQConsumerWrapper persistentMqConsumer;
    private final MQConsumerWrapper broadcastMqConsumer;
    private MessageHandler httpMessageHandler;

    public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
        this.consumerGroupConf = consumerGroupConf;
        this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType());
        this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType());
    }

    public synchronized void init() throws Exception {
        this.httpMessageHandler = new HTTPMessageHandler(this);
        Properties properties = new Properties();
        properties.put(EventMeshConstants.IS_BROADCAST, "false");
        properties.put(EventMeshConstants.CONSUMER_GROUP, this.consumerGroupConf.getConsumerGroup());
        properties.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
        properties.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID(this.consumerGroupConf.getConsumerGroup(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
        this.persistentMqConsumer.init(properties);
        this.persistentMqConsumer.registerEventListener((cloudEvent, asyncConsumeContext) -> {
            Span prepareServerSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(cloudEvent.getSpecVersion())).toString(), cloudEvent), "downstream-eventmesh-server-span", false);
            try {
                Optional ofNullable = Optional.ofNullable(EventMeshExtensionFactory.getExtension(TopicNameHelper.class, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType()));
                String subject = cloudEvent.getSubject();
                String obj = Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
                String obj2 = Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
                cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp()).build();
                if (this.messageLogger.isDebugEnabled()) {
                    this.messageLogger.debug("message|mq2eventMesh|topic={}|event={}", subject, cloudEvent);
                } else {
                    this.messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{subject, obj, obj2});
                }
                if (ofNullable.isPresent() && ((TopicNameHelper) ofNullable.get()).isRetryTopic(subject)) {
                    subject = String.valueOf(cloudEvent.getExtension(EventMeshConstants.MANAGE_TOPIC));
                }
                ConsumerGroupTopicConf consumerGroupTopicConf = (ConsumerGroupTopicConf) MapUtils.getObject(this.consumerGroupConf.getConsumerGroupTopicConf(), subject, (Object) null);
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext;
                if (consumerGroupTopicConf == null) {
                    try {
                        sendMessageBack(cloudEvent, obj2, obj);
                        log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", new Object[]{this.consumerGroupConf.getConsumerGroup(), subject, obj, obj2});
                    } catch (Exception e) {
                        log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", new Object[]{this.consumerGroupConf.getConsumerGroup(), subject, obj, obj2, e});
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                    TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                    return;
                }
                if (this.httpMessageHandler.handle(new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), this.consumerGroupConf.getConsumerGroup(), this, subject, cloudEvent, this.consumerGroupConf.getConsumerGroupTopicConf().get(subject).getSubscriptionItem(), eventMeshAsyncConsumeContext.getAbstractContext(), this.consumerGroupConf, this.eventMeshHTTPServer, obj, obj2, consumerGroupTopicConf))) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                } else {
                    try {
                        sendMessageBack(cloudEvent, obj2, obj);
                    } catch (Exception e2) {
                        log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", new Object[]{subject, obj, obj2, e2});
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                }
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                return;
            } catch (Throwable th) {
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                throw th;
            }
            TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
            throw th;
        });
        Properties properties2 = new Properties();
        properties2.put(EventMeshConstants.IS_BROADCAST, "true");
        properties2.put(EventMeshConstants.CONSUMER_GROUP, this.consumerGroupConf.getConsumerGroup());
        properties2.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
        properties2.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID(this.consumerGroupConf.getConsumerGroup(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
        this.broadcastMqConsumer.init(properties2);
        this.broadcastMqConsumer.registerEventListener((cloudEvent2, asyncConsumeContext2) -> {
            Span prepareServerSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(cloudEvent2.getSpecVersion())).toString(), cloudEvent2), "downstream-eventmesh-server-span", false);
            try {
                CloudEvent build = CloudEventBuilder.from(cloudEvent2).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp()).build();
                String subject = build.getSubject();
                String eventExtension = getEventExtension(build, ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey());
                String eventExtension2 = getEventExtension(build, ProtocolKey.ClientInstanceKey.UNIQUEID.getKey());
                if (this.messageLogger.isDebugEnabled()) {
                    this.messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", subject, build);
                } else {
                    this.messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{subject, eventExtension, eventExtension2});
                }
                ConsumerGroupTopicConf consumerGroupTopicConf = (ConsumerGroupTopicConf) MapUtils.getObject(this.consumerGroupConf.getConsumerGroupTopicConf(), subject, (Object) null);
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext2;
                if (consumerGroupTopicConf == null) {
                    log.error("no topicConfig found, consumerGroup:{} topic:{}", this.consumerGroupConf.getConsumerGroup(), subject);
                    try {
                        sendMessageBack(build, eventExtension2, eventExtension);
                        eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                        TraceUtils.finishSpan(prepareServerSpan, build);
                        return;
                    } catch (Exception e) {
                    }
                }
                if (this.httpMessageHandler.handle(new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), this.consumerGroupConf.getConsumerGroup(), this, subject, build, this.consumerGroupConf.getConsumerGroupTopicConf().get(subject).getSubscriptionItem(), eventMeshAsyncConsumeContext.getAbstractContext(), this.consumerGroupConf, this.eventMeshHTTPServer, eventExtension, eventExtension2, consumerGroupTopicConf))) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                } else {
                    try {
                        sendMessageBack(build, eventExtension2, eventExtension);
                    } catch (Exception e2) {
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                }
                TraceUtils.finishSpan(prepareServerSpan, build);
            } catch (Throwable th) {
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent2);
                throw th;
            }
        });
        this.inited4Persistent.compareAndSet(false, true);
        this.inited4Broadcast.compareAndSet(false, true);
        log.info("EventMeshConsumer [{}] inited.............", this.consumerGroupConf.getConsumerGroup());
    }

    private String getEventExtension(CloudEvent cloudEvent, String str) {
        Object extension = cloudEvent.getExtension(str);
        return Objects.isNull(extension) ? "" : extension.toString();
    }

    public synchronized void start() throws Exception {
        this.persistentMqConsumer.start();
        this.started4Persistent.compareAndSet(false, true);
        this.broadcastMqConsumer.start();
        this.started4Broadcast.compareAndSet(false, true);
    }

    public void subscribe(String str, SubscriptionItem subscriptionItem) throws Exception {
        if (SubscriptionMode.BROADCASTING != subscriptionItem.getMode()) {
            this.persistentMqConsumer.subscribe(str);
        } else {
            this.broadcastMqConsumer.subscribe(str);
        }
    }

    public void unsubscribe(String str, SubscriptionMode subscriptionMode) throws Exception {
        if (SubscriptionMode.BROADCASTING == subscriptionMode) {
            this.broadcastMqConsumer.unsubscribe(str);
        } else {
            this.persistentMqConsumer.unsubscribe(str);
        }
    }

    public synchronized void shutdown() throws Exception {
        this.persistentMqConsumer.shutdown();
        this.started4Persistent.compareAndSet(true, false);
        this.broadcastMqConsumer.shutdown();
        this.started4Broadcast.compareAndSet(true, false);
    }

    public void updateOffset(String str, SubscriptionMode subscriptionMode, List<CloudEvent> list, AbstractContext abstractContext) {
        if (SubscriptionMode.BROADCASTING == subscriptionMode) {
            this.broadcastMqConsumer.updateOffset(list, abstractContext);
        } else {
            this.persistentMqConsumer.updateOffset(list, abstractContext);
        }
    }

    public ConsumerGroupConf getConsumerGroupConf() {
        return this.consumerGroupConf;
    }

    public void setConsumerGroupConf(ConsumerGroupConf consumerGroupConf) {
        this.consumerGroupConf = consumerGroupConf;
    }

    public EventMeshHTTPServer getEventMeshHTTPServer() {
        return this.eventMeshHTTPServer;
    }

    public void sendMessageBack(CloudEvent cloudEvent, final String str, final String str2) throws Exception {
        EventMeshProducer eventMeshProducer = this.eventMeshHTTPServer.getProducerManager().getEventMeshProducer(this.consumerGroupConf.getConsumerGroup());
        if (eventMeshProducer == null) {
            log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", new Object[]{this.consumerGroupConf.getConsumerGroup(), str2, str});
        } else {
            eventMeshProducer.send(new SendMessageContext(str2, cloudEvent, eventMeshProducer, this.eventMeshHTTPServer), new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.http.consumer.EventMeshConsumer.1
                public void onSuccess(SendResult sendResult) {
                }

                public void onException(OnExceptionContext onExceptionContext) {
                    EventMeshConsumer.log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", new Object[]{EventMeshConsumer.this.consumerGroupConf.getConsumerGroup(), str2, str});
                }
            });
        }
    }
}
