package org.apache.eventmesh.runtime.core.protocol.grpc.consumer;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.class */
public class EventMeshConsumer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshConsumer.class);
    private final transient String consumerGroup;
    private final transient EventMeshGrpcServer eventMeshGrpcServer;
    private final transient EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
    private final transient MQConsumerWrapper persistentMqConsumer;
    private final transient MQConsumerWrapper broadcastMqConsumer;
    private final transient MessageHandler messageHandler;
    private transient ServiceState serviceState;
    private final transient Map<String, ConsumerGroupTopicConfig> consumerGroupTopicConfig = new ConcurrentHashMap();

    public EventMeshConsumer(EventMeshGrpcServer eventMeshGrpcServer, String str) {
        this.eventMeshGrpcServer = eventMeshGrpcServer;
        this.eventMeshGrpcConfiguration = eventMeshGrpcServer.getEventMeshGrpcConfiguration();
        this.consumerGroup = str;
        this.messageHandler = new MessageHandler(str, eventMeshGrpcServer.getPushMsgExecutor());
        this.persistentMqConsumer = new MQConsumerWrapper(this.eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
        this.broadcastMqConsumer = new MQConsumerWrapper(this.eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
    }

    public synchronized boolean registerClient(ConsumerGroupClient consumerGroupClient) {
        boolean z = false;
        ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(consumerGroupClient.getTopic());
        if (consumerGroupTopicConfig == null) {
            consumerGroupTopicConfig = ConsumerGroupTopicConfig.buildTopicConfig(this.consumerGroup, consumerGroupClient.getTopic(), consumerGroupClient.getSubscriptionMode(), consumerGroupClient.getGrpcType());
            this.consumerGroupTopicConfig.put(consumerGroupClient.getTopic(), consumerGroupTopicConfig);
            z = true;
        }
        consumerGroupTopicConfig.registerClient(consumerGroupClient);
        return z;
    }

    public synchronized boolean deregisterClient(ConsumerGroupClient consumerGroupClient) {
        boolean z = false;
        ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(consumerGroupClient.getTopic());
        if (consumerGroupTopicConfig != null) {
            consumerGroupTopicConfig.deregisterClient(consumerGroupClient);
            if (consumerGroupTopicConfig.getSize() == 0) {
                this.consumerGroupTopicConfig.remove(consumerGroupClient.getTopic());
                z = true;
            }
        }
        return z;
    }

    public synchronized void init() throws Exception {
        if (MapUtils.isEmpty(this.consumerGroupTopicConfig)) {
            return;
        }
        Properties properties = new Properties();
        properties.put(EventMeshConstants.IS_BROADCAST, "false");
        properties.put(EventMeshConstants.CONSUMER_GROUP, this.consumerGroup);
        properties.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshGrpcConfiguration.getEventMeshIDC());
        properties.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID(this.consumerGroup, this.eventMeshGrpcConfiguration.getEventMeshCluster()));
        this.persistentMqConsumer.init(properties);
        this.persistentMqConsumer.registerEventListener(createEventListener(SubscriptionMode.CLUSTERING));
        Properties properties2 = new Properties();
        properties2.put(EventMeshConstants.IS_BROADCAST, "true");
        properties2.put(EventMeshConstants.CONSUMER_GROUP, this.consumerGroup);
        properties2.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshGrpcConfiguration.getEventMeshIDC());
        properties2.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID(this.consumerGroup, this.eventMeshGrpcConfiguration.getEventMeshCluster()));
        this.broadcastMqConsumer.init(properties2);
        this.broadcastMqConsumer.registerEventListener(createEventListener(SubscriptionMode.BROADCASTING));
        this.serviceState = ServiceState.INITED;
        if (log.isInfoEnabled()) {
            log.info("EventMeshConsumer [{}] initialized.............", this.consumerGroup);
        }
    }

    public synchronized void start() throws Exception {
        if (MapUtils.isEmpty(this.consumerGroupTopicConfig)) {
            return;
        }
        this.consumerGroupTopicConfig.forEach((str, consumerGroupTopicConfig) -> {
            try {
                subscribe(str, consumerGroupTopicConfig.getSubscriptionMode());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.persistentMqConsumer.start();
        this.broadcastMqConsumer.start();
        this.serviceState = ServiceState.RUNNING;
        if (log.isInfoEnabled()) {
            log.info("EventMeshConsumer [{}] started..........", this.consumerGroup);
        }
    }

    public synchronized void shutdown() throws Exception {
        this.persistentMqConsumer.shutdown();
        this.broadcastMqConsumer.shutdown();
        this.serviceState = ServiceState.STOPPED;
        if (log.isInfoEnabled()) {
            log.info("EventMeshConsumer [{}] shutdown.........", this.consumerGroup);
        }
    }

    public ServiceState getStatus() {
        return this.serviceState;
    }

    public void subscribe(String str, SubscriptionMode subscriptionMode) throws Exception {
        if (SubscriptionMode.CLUSTERING == subscriptionMode) {
            this.persistentMqConsumer.subscribe(str);
        } else {
            if (SubscriptionMode.BROADCASTING != subscriptionMode) {
                throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
            }
            this.broadcastMqConsumer.subscribe(str);
        }
    }

    public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception {
        SubscriptionMode mode = subscriptionItem.getMode();
        String topic = subscriptionItem.getTopic();
        if (SubscriptionMode.CLUSTERING == mode) {
            this.persistentMqConsumer.unsubscribe(topic);
        } else {
            if (SubscriptionMode.BROADCASTING != mode) {
                throw new Exception("Unsubscribe Failed. Incorrect Subscription Mode");
            }
            this.broadcastMqConsumer.unsubscribe(topic);
        }
    }

    public void updateOffset(SubscriptionMode subscriptionMode, List<CloudEvent> list, AbstractContext abstractContext) throws Exception {
        if (SubscriptionMode.CLUSTERING == subscriptionMode) {
            this.persistentMqConsumer.updateOffset(list, abstractContext);
        } else {
            if (SubscriptionMode.BROADCASTING != subscriptionMode) {
                throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
            }
            this.broadcastMqConsumer.updateOffset(list, abstractContext);
        }
    }

    private EventListener createEventListener(SubscriptionMode subscriptionMode) {
        return (cloudEvent, asyncConsumeContext) -> {
            CloudEvent build = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
            String subject = build.getSubject();
            String str = (String) Optional.ofNullable((String) build.getExtension("searchkeys")).orElseGet(() -> {
                return "";
            });
            String str2 = (String) Optional.ofNullable((String) build.getExtension("rmbuniqid")).orElseGet(() -> {
                return "";
            });
            if (log.isDebugEnabled()) {
                log.debug("message|mq2eventMesh|topic={}|msg={}", subject, build);
            } else {
                if (log.isInfoEnabled()) {
                    log.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{subject, str, str2});
                }
                this.eventMeshGrpcServer.getMetricsMonitor().recordReceiveMsgFromQueue();
            }
            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext;
            ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(subject);
            if (consumerGroupTopicConfig != null) {
                if (this.messageHandler.handle(new HandleMsgContext(this.consumerGroup, build, subscriptionMode, consumerGroupTopicConfig.getGrpcType(), eventMeshAsyncConsumeContext.getAbstractContext(), this.eventMeshGrpcServer, this, consumerGroupTopicConfig))) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                    return;
                } else {
                    try {
                        ThreadUtils.sleep(5L, TimeUnit.SECONDS);
                        sendMessageBack(this.consumerGroup, build, str2, str);
                    } catch (Exception e) {
                    }
                }
            } else if (log.isDebugEnabled()) {
                log.debug("no active consumer for topic={}|msg={}", subject, build);
            }
            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
        };
    }

    public void sendMessageBack(final String str, CloudEvent cloudEvent, final String str2, final String str3) throws Exception {
        EventMeshProducer eventMeshProducer = this.eventMeshGrpcServer.getProducerManager().getEventMeshProducer(str);
        if (eventMeshProducer != null) {
            eventMeshProducer.send(new SendMessageContext(str3, cloudEvent, eventMeshProducer, this.eventMeshGrpcServer), new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer.1
                public void onSuccess(SendResult sendResult) {
                }

                public void onException(OnExceptionContext onExceptionContext) {
                    if (EventMeshConsumer.log.isWarnEnabled()) {
                        EventMeshConsumer.log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", new Object[]{str, str3, str2});
                    }
                }
            });
        } else if (log.isWarnEnabled()) {
            log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", new Object[]{str, str3, str2});
        }
    }
}
