package org.apache.eventmesh.runtime.core.protocol.http.consumer;

import com.google.common.eventbus.Subscribe;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupTopicConfChangeEvent;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.class */
public class ConsumerManager {
    private EventMeshHTTPServer eventMeshHTTPServer;
    private static final int DEFAULT_UPDATE_TIME = 90000;
    private ConcurrentHashMap<String, ConsumerGroupManager> consumerTable = new ConcurrentHashMap<>();
    public Logger logger = LoggerFactory.getLogger(getClass());
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public ConsumerManager(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    public void init() throws Exception {
        this.eventMeshHTTPServer.getEventBus().register(this);
        this.logger.info("consumerManager inited......");
    }

    public void start() throws Exception {
        this.logger.info("consumerManager started......");
    }

    public void notifyConsumerManager(String str, ConsumerGroupConf consumerGroupConf) throws Exception {
        ConsumerGroupManager consumer = this.eventMeshHTTPServer.getConsumerManager().getConsumer(str);
        if (consumerGroupConf == null) {
            ConsumerGroupStateEvent consumerGroupStateEvent = new ConsumerGroupStateEvent();
            consumerGroupStateEvent.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
            consumerGroupStateEvent.consumerGroup = str;
            this.eventMeshHTTPServer.getEventBus().post(consumerGroupStateEvent);
            return;
        }
        if (consumer == null) {
            ConsumerGroupStateEvent consumerGroupStateEvent2 = new ConsumerGroupStateEvent();
            consumerGroupStateEvent2.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
            consumerGroupStateEvent2.consumerGroup = str;
            consumerGroupStateEvent2.consumerGroupConfig = (ConsumerGroupConf) EventMeshUtil.cloneObject(consumerGroupConf);
            this.eventMeshHTTPServer.getEventBus().post(consumerGroupStateEvent2);
            return;
        }
        if (consumerGroupConf.equals(consumer.getConsumerGroupConfig())) {
            return;
        }
        ConsumerGroupStateEvent consumerGroupStateEvent3 = new ConsumerGroupStateEvent();
        consumerGroupStateEvent3.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.CHANGE;
        consumerGroupStateEvent3.consumerGroup = str;
        consumerGroupStateEvent3.consumerGroupConfig = (ConsumerGroupConf) EventMeshUtil.cloneObject(consumerGroupConf);
        this.eventMeshHTTPServer.getEventBus().post(consumerGroupStateEvent3);
    }

    public void shutdown() {
        this.eventMeshHTTPServer.getEventBus().unregister(this);
        for (ConsumerGroupManager consumerGroupManager : this.consumerTable.values()) {
            try {
                consumerGroupManager.shutdown();
            } catch (Exception e) {
                this.logger.error("shutdown consumerGroupManager[{}] err", consumerGroupManager, e);
            }
        }
        this.logger.info("consumerManager shutdown......");
    }

    public boolean contains(String str) {
        return this.consumerTable.containsKey(str);
    }

    public synchronized void addConsumer(String str, ConsumerGroupConf consumerGroupConf) throws Exception {
        ConsumerGroupManager consumerGroupManager = new ConsumerGroupManager(this.eventMeshHTTPServer, consumerGroupConf);
        consumerGroupManager.init();
        consumerGroupManager.start();
        this.consumerTable.put(str, consumerGroupManager);
    }

    public synchronized void restartConsumer(String str, ConsumerGroupConf consumerGroupConf) throws Exception {
        if (this.consumerTable.containsKey(str)) {
            this.consumerTable.get(str).refresh(consumerGroupConf);
        }
    }

    public ConsumerGroupManager getConsumer(String str) throws Exception {
        return this.consumerTable.get(str);
    }

    public synchronized void delConsumer(String str) throws Exception {
        this.logger.info("start delConsumer with consumerGroup {}", str);
        if (this.consumerTable.containsKey(str)) {
            ConsumerGroupManager remove = this.consumerTable.remove(str);
            this.logger.info("start unsubscribe topic with consumer group manager {}", JsonUtils.serialize(remove));
            remove.unsubscribe(str);
            remove.shutdown();
        }
        this.logger.info("end delConsumer with consumerGroup {}", str);
    }

    @Subscribe
    public void onChange(ConsumerGroupTopicConfChangeEvent consumerGroupTopicConfChangeEvent) {
        try {
            this.logger.info("onChange event:{}", consumerGroupTopicConfChangeEvent);
            if (consumerGroupTopicConfChangeEvent.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
                ConsumerGroupManager consumer = getConsumer(consumerGroupTopicConfChangeEvent.consumerGroup);
                if (Objects.isNull(consumer)) {
                    return;
                }
                consumer.getConsumerGroupConfig().getConsumerGroupTopicConf().put(consumerGroupTopicConfChangeEvent.topic, consumerGroupTopicConfChangeEvent.newTopicConf);
                return;
            }
            if (consumerGroupTopicConfChangeEvent.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
                ConsumerGroupManager consumer2 = getConsumer(consumerGroupTopicConfChangeEvent.consumerGroup);
                if (Objects.isNull(consumer2)) {
                    return;
                }
                consumer2.getConsumerGroupConfig().getConsumerGroupTopicConf().replace(consumerGroupTopicConfChangeEvent.topic, consumerGroupTopicConfChangeEvent.newTopicConf);
                return;
            }
            if (consumerGroupTopicConfChangeEvent.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
                ConsumerGroupManager consumer3 = getConsumer(consumerGroupTopicConfChangeEvent.consumerGroup);
                if (Objects.isNull(consumer3)) {
                    return;
                }
                consumer3.getConsumerGroupConfig().getConsumerGroupTopicConf().remove(consumerGroupTopicConfChangeEvent.topic);
            }
        } catch (Exception e) {
            this.logger.error("onChange event:{} err", consumerGroupTopicConfChangeEvent, e);
        }
    }

    @Subscribe
    public void onChange(ConsumerGroupStateEvent consumerGroupStateEvent) {
        try {
            this.logger.info("onChange event:{}", consumerGroupStateEvent);
            if (consumerGroupStateEvent.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW) {
                addConsumer(consumerGroupStateEvent.consumerGroup, consumerGroupStateEvent.consumerGroupConfig);
            } else if (consumerGroupStateEvent.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.CHANGE) {
                restartConsumer(consumerGroupStateEvent.consumerGroup, consumerGroupStateEvent.consumerGroupConfig);
            } else if (consumerGroupStateEvent.action == ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE) {
                delConsumer(consumerGroupStateEvent.consumerGroup);
            }
        } catch (Exception e) {
            this.logger.error("onChange event:{} err", consumerGroupStateEvent, e);
        }
    }
}
