package org.apache.eventmesh.runtime.core.protocol.grpc.consumer;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.class */
public class EventMeshConsumer {
    private final String consumerGroup;
    private final EventMeshGrpcServer eventMeshGrpcServer;
    private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
    private final MQConsumerWrapper persistentMqConsumer;
    private final MQConsumerWrapper broadcastMqConsumer;
    private final MessageHandler messageHandler;
    private ServiceState serviceState;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Map<String, ConsumerGroupTopicConfig> consumerGroupTopicConfig = new ConcurrentHashMap();

    public EventMeshConsumer(EventMeshGrpcServer eventMeshGrpcServer, String str) {
        this.eventMeshGrpcServer = eventMeshGrpcServer;
        this.eventMeshGrpcConfiguration = eventMeshGrpcServer.getEventMeshGrpcConfiguration();
        this.consumerGroup = str;
        this.messageHandler = new MessageHandler(str, eventMeshGrpcServer.getPushMsgExecutor());
        this.persistentMqConsumer = new MQConsumerWrapper(this.eventMeshGrpcConfiguration.eventMeshConnectorPluginType);
        this.broadcastMqConsumer = new MQConsumerWrapper(this.eventMeshGrpcConfiguration.eventMeshConnectorPluginType);
    }

    public synchronized boolean registerClient(ConsumerGroupClient consumerGroupClient) {
        boolean z = false;
        GrpcType grpcType = consumerGroupClient.getGrpcType();
        String topic = consumerGroupClient.getTopic();
        Subscription.SubscriptionItem.SubscriptionMode subscriptionMode = consumerGroupClient.getSubscriptionMode();
        ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(topic);
        if (consumerGroupTopicConfig == null) {
            consumerGroupTopicConfig = ConsumerGroupTopicConfig.buildTopicConfig(this.consumerGroup, topic, subscriptionMode, grpcType);
            this.consumerGroupTopicConfig.put(topic, consumerGroupTopicConfig);
            z = true;
        }
        consumerGroupTopicConfig.registerClient(consumerGroupClient);
        return z;
    }

    public synchronized boolean deregisterClient(ConsumerGroupClient consumerGroupClient) {
        boolean z = false;
        String topic = consumerGroupClient.getTopic();
        ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(topic);
        if (consumerGroupTopicConfig != null) {
            consumerGroupTopicConfig.deregisterClient(consumerGroupClient);
            if (consumerGroupTopicConfig.getSize() == 0) {
                this.consumerGroupTopicConfig.remove(topic);
                z = true;
            }
        }
        return z;
    }

    public synchronized void init() throws Exception {
        if (this.consumerGroupTopicConfig.size() == 0) {
            return;
        }
        Properties properties = new Properties();
        properties.put("isBroadcast", "false");
        properties.put("consumerGroup", this.consumerGroup);
        properties.put("eventMeshIDC", this.eventMeshGrpcConfiguration.eventMeshIDC);
        properties.put("instanceName", EventMeshUtil.buildMeshClientID(this.consumerGroup, this.eventMeshGrpcConfiguration.eventMeshCluster));
        this.persistentMqConsumer.init(properties);
        this.persistentMqConsumer.registerEventListener(createEventListener(Subscription.SubscriptionItem.SubscriptionMode.CLUSTERING));
        Properties properties2 = new Properties();
        properties2.put("isBroadcast", "true");
        properties2.put("consumerGroup", this.consumerGroup);
        properties2.put("eventMeshIDC", this.eventMeshGrpcConfiguration.eventMeshIDC);
        properties2.put("instanceName", EventMeshUtil.buildMeshClientID(this.consumerGroup, this.eventMeshGrpcConfiguration.eventMeshCluster));
        this.broadcastMqConsumer.init(properties2);
        this.broadcastMqConsumer.registerEventListener(createEventListener(Subscription.SubscriptionItem.SubscriptionMode.BROADCASTING));
        this.serviceState = ServiceState.INITED;
        this.logger.info("EventMeshConsumer [{}] initialized.............", this.consumerGroup);
    }

    public synchronized void start() throws Exception {
        if (this.consumerGroupTopicConfig.size() == 0) {
            return;
        }
        for (Map.Entry<String, ConsumerGroupTopicConfig> entry : this.consumerGroupTopicConfig.entrySet()) {
            subscribe(entry.getKey(), entry.getValue().getSubscriptionMode());
        }
        this.persistentMqConsumer.start();
        this.broadcastMqConsumer.start();
        this.serviceState = ServiceState.RUNNING;
        this.logger.info("EventMeshConsumer [{}] started..........", this.consumerGroup);
    }

    public synchronized void shutdown() throws Exception {
        this.persistentMqConsumer.shutdown();
        this.broadcastMqConsumer.shutdown();
        this.serviceState = ServiceState.STOPED;
        this.logger.info("EventMeshConsumer [{}] shutdown.........", this.consumerGroup);
    }

    public ServiceState getStatus() {
        return this.serviceState;
    }

    public void subscribe(String str, Subscription.SubscriptionItem.SubscriptionMode subscriptionMode) throws Exception {
        if (Subscription.SubscriptionItem.SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
            this.persistentMqConsumer.subscribe(str);
        } else if (Subscription.SubscriptionItem.SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
            this.broadcastMqConsumer.subscribe(str);
        } else {
            this.logger.error("Subscribe Failed. Incorrect Subscription Mode");
            throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
        }
    }

    public void unsubscribe(Subscription.SubscriptionItem subscriptionItem) throws Exception {
        Subscription.SubscriptionItem.SubscriptionMode mode = subscriptionItem.getMode();
        String topic = subscriptionItem.getTopic();
        if (Subscription.SubscriptionItem.SubscriptionMode.CLUSTERING.equals(mode)) {
            this.persistentMqConsumer.unsubscribe(topic);
        } else if (Subscription.SubscriptionItem.SubscriptionMode.BROADCASTING.equals(mode)) {
            this.broadcastMqConsumer.unsubscribe(topic);
        } else {
            this.logger.error("Unsubscribe Failed. Incorrect Subscription Mode");
            throw new Exception("Unsubscribe Failed. Incorrect Subscription Mode");
        }
    }

    public void updateOffset(Subscription.SubscriptionItem.SubscriptionMode subscriptionMode, List<CloudEvent> list, AbstractContext abstractContext) throws Exception {
        if (Subscription.SubscriptionItem.SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
            this.persistentMqConsumer.updateOffset(list, abstractContext);
        } else if (Subscription.SubscriptionItem.SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
            this.broadcastMqConsumer.updateOffset(list, abstractContext);
        } else {
            this.logger.error("Subscribe Failed. Incorrect Subscription Mode");
            throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
        }
    }

    private EventListener createEventListener(Subscription.SubscriptionItem.SubscriptionMode subscriptionMode) {
        return (cloudEvent, asyncConsumeContext) -> {
            CloudEvent build = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
            String subject = build.getSubject();
            Object extension = build.getExtension("searchkeys");
            String obj = extension == null ? "" : extension.toString();
            Object extension2 = build.getExtension("rmbuniqid");
            String obj2 = extension2 == null ? "" : extension2.toString();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("message|mq2eventMesh|topic={}|msg={}", subject, build);
            } else {
                this.logger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{subject, extension, extension2});
            }
            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext;
            ConsumerGroupTopicConfig consumerGroupTopicConfig = this.consumerGroupTopicConfig.get(subject);
            if (consumerGroupTopicConfig != null) {
                if (this.messageHandler.handle(new HandleMsgContext(this.consumerGroup, build, subscriptionMode, consumerGroupTopicConfig.getGrpcType(), eventMeshAsyncConsumeContext.getAbstractContext(), this.eventMeshGrpcServer, this, consumerGroupTopicConfig))) {
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                    return;
                } else {
                    try {
                        Thread.sleep(5000L);
                        sendMessageBack(this.consumerGroup, build, obj2, obj);
                    } catch (Exception e) {
                    }
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("no active consumer for topic={}|msg={}", subject, build);
            }
            eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
        };
    }

    public void sendMessageBack(final String str, CloudEvent cloudEvent, final String str2, final String str3) throws Exception {
        EventMeshProducer eventMeshProducer = this.eventMeshGrpcServer.getProducerManager().getEventMeshProducer(str);
        if (eventMeshProducer == null) {
            this.logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", new Object[]{str, str3, str2});
        } else {
            eventMeshProducer.send(new SendMessageContext(str3, cloudEvent, eventMeshProducer, this.eventMeshGrpcServer), new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer.1
                public void onSuccess(SendResult sendResult) {
                }

                public void onException(OnExceptionContext onExceptionContext) {
                    EventMeshConsumer.this.logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", new Object[]{str, str3, str2});
                }
            });
        }
    }
}
