package org.apache.eventmesh.runtime.core.protocol.grpc.consumer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.class */
public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final EventMeshGrpcServer eventMeshGrpcServer;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final Map<String, List<ConsumerGroupClient>> clientTable = new ConcurrentHashMap();
    private final Map<String, EventMeshConsumer> consumerTable = new ConcurrentHashMap();

    public ConsumerManager(EventMeshGrpcServer eventMeshGrpcServer) {
        this.eventMeshGrpcServer = eventMeshGrpcServer;
    }

    public Map<String, List<ConsumerGroupClient>> getClientTable() {
        return this.clientTable;
    }

    public void init() throws Exception {
        LogUtils.info(log, "Grpc ConsumerManager initialized.");
    }

    public void start() throws Exception {
        startClientCheck();
        LogUtils.info(log, "Grpc ConsumerManager started.");
    }

    public void shutdown() throws Exception {
        Iterator<EventMeshConsumer> it = this.consumerTable.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.scheduledExecutorService.shutdown();
        LogUtils.info(log, "Grpc ConsumerManager shutdown.");
    }

    public EventMeshConsumer getEventMeshConsumer(String str) {
        return this.consumerTable.computeIfAbsent(str, str2 -> {
            return new EventMeshConsumer(this.eventMeshGrpcServer, str);
        });
    }

    public synchronized void registerClient(ConsumerGroupClient consumerGroupClient) {
        String consumerGroup = consumerGroupClient.getConsumerGroup();
        String topic = consumerGroupClient.getTopic();
        GrpcType grpcType = consumerGroupClient.getGrpcType();
        String url = consumerGroupClient.getUrl();
        String ip = consumerGroupClient.getIp();
        String pid = consumerGroupClient.getPid();
        SubscriptionMode subscriptionMode = consumerGroupClient.getSubscriptionMode();
        List<ConsumerGroupClient> list = this.clientTable.get(consumerGroup);
        if (list == null) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(consumerGroupClient);
            this.clientTable.putIfAbsent(consumerGroup, arrayList);
            return;
        }
        boolean z = false;
        Iterator<ConsumerGroupClient> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ConsumerGroupClient next = it.next();
            if (GrpcType.WEBHOOK != grpcType || !StringUtils.equals(next.getTopic(), topic) || !StringUtils.equals(next.getUrl(), url) || next.getSubscriptionMode() != subscriptionMode) {
                if (GrpcType.STREAM == grpcType && StringUtils.equals(next.getTopic(), topic) && StringUtils.equals(next.getIp(), ip) && StringUtils.equals(next.getPid(), pid) && next.getSubscriptionMode() == subscriptionMode) {
                    z = true;
                    next.setEventEmitter(consumerGroupClient.getEventEmitter());
                    next.setLastUpTime(consumerGroupClient.getLastUpTime());
                    break;
                }
            } else {
                z = true;
                next.setUrl(consumerGroupClient.getUrl());
                next.setLastUpTime(consumerGroupClient.getLastUpTime());
                break;
            }
        }
        if (z) {
            return;
        }
        list.add(consumerGroupClient);
    }

    public boolean updateClientTime(ConsumerGroupClient consumerGroupClient) {
        List<ConsumerGroupClient> list = this.clientTable.get(consumerGroupClient.getConsumerGroup());
        if (CollectionUtils.isEmpty(list)) {
            return false;
        }
        for (ConsumerGroupClient consumerGroupClient2 : list) {
            if (StringUtils.equals(consumerGroupClient2.getIp(), consumerGroupClient.getIp()) && StringUtils.equals(consumerGroupClient2.getPid(), consumerGroupClient.getPid()) && StringUtils.equals(consumerGroupClient2.getSys(), consumerGroupClient.getSys()) && StringUtils.equals(consumerGroupClient2.getTopic(), consumerGroupClient.getTopic())) {
                consumerGroupClient2.setLastUpTime(new Date());
                return true;
            }
        }
        return false;
    }

    public synchronized void deregisterClient(ConsumerGroupClient consumerGroupClient) {
        String consumerGroup = consumerGroupClient.getConsumerGroup();
        List<ConsumerGroupClient> list = this.clientTable.get(consumerGroup);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        Iterator<ConsumerGroupClient> it = list.iterator();
        synchronized (this.clientTable) {
            while (it.hasNext()) {
                ConsumerGroupClient next = it.next();
                if (StringUtils.equals(next.getTopic(), consumerGroupClient.getTopic()) && next.getSubscriptionMode() == consumerGroupClient.getSubscriptionMode()) {
                    closeEventStream(next);
                    it.remove();
                }
            }
        }
        if (CollectionUtils.isEmpty(list)) {
            this.clientTable.remove(consumerGroup);
        }
    }

    private void closeEventStream(ConsumerGroupClient consumerGroupClient) {
        if (consumerGroupClient.getEventEmitter() != null) {
            consumerGroupClient.getEventEmitter().onCompleted();
        }
    }

    public synchronized void restartEventMeshConsumer(String str) throws Exception {
        EventMeshConsumer eventMeshConsumer = this.consumerTable.get(str);
        if (eventMeshConsumer == null) {
            return;
        }
        if (ServiceState.RUNNING == eventMeshConsumer.getStatus()) {
            eventMeshConsumer.shutdown();
        }
        eventMeshConsumer.init();
        eventMeshConsumer.start();
        if (ServiceState.RUNNING != eventMeshConsumer.getStatus()) {
            this.consumerTable.remove(str);
        }
    }

    private void startClientCheck() {
        int eventMeshSessionExpiredInMills = this.eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshSessionExpiredInMills();
        if (eventMeshSessionExpiredInMills > 0) {
            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                LogUtils.debug(log, "grpc client info check");
                ArrayList arrayList = new ArrayList();
                Collection<List<ConsumerGroupClient>> values = this.clientTable.values();
                arrayList.getClass();
                values.forEach((v1) -> {
                    r1.addAll(v1);
                });
                LogUtils.debug(log, "total number of ConsumerGroupClients: {}", Integer.valueOf(arrayList.size()));
                if (CollectionUtils.isEmpty(arrayList)) {
                    return;
                }
                HashSet hashSet = new HashSet();
                arrayList.forEach(consumerGroupClient -> {
                    if (System.currentTimeMillis() - consumerGroupClient.getLastUpTime().getTime() > eventMeshSessionExpiredInMills) {
                        LogUtils.warn(log, "client {} lastUpdate time {} over three heartbeat cycles. Removing it", JsonUtils.toJSONString(consumerGroupClient), consumerGroupClient.getLastUpTime());
                        deregisterClient(consumerGroupClient);
                        if (getEventMeshConsumer(consumerGroupClient.getConsumerGroup()).deregisterClient(consumerGroupClient)) {
                            hashSet.add(consumerGroupClient.getConsumerGroup());
                        }
                    }
                });
                hashSet.forEach(str -> {
                    try {
                        restartEventMeshConsumer(str);
                    } catch (Exception e) {
                        LogUtils.error(log, "Error in restarting EventMeshConsumer [{}]", str, e);
                    }
                });
            }, 10000L, 10000L, TimeUnit.MILLISECONDS);
        }
    }

    public List<String> getAllConsumerTopic() {
        return (List) this.clientTable.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getTopic();
        }).distinct().collect(Collectors.toList());
    }
}
