package org.apache.eventmesh.runtime.boot;

import com.google.common.util.concurrent.RateLimiter;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ConsumerService;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.HeartbeatService;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.PublisherService;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.metrics.grpc.EventMeshGrpcMonitor;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.class */
public class EventMeshGrpcServer extends AbstractRemotingServer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshGrpcServer.class);
    private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
    private static final int MIN_LIMIT = 5;
    private static final int MAX_LIMIT = 10;
    private Server server;
    private ConsumerManager consumerManager;
    private GrpcRetryer grpcRetryer;
    private ThreadPoolExecutor sendMsgExecutor;
    private ThreadPoolExecutor replyMsgExecutor;
    private ThreadPoolExecutor clientMgmtExecutor;
    private ThreadPoolExecutor pushMsgExecutor;
    private List<CloseableHttpClient> httpClientPool;
    private RateLimiter msgRateLimiter;
    private final MetaStorage metaStorage;
    private final Acl acl;
    private final EventMeshServer eventMeshServer;
    private EventMeshGrpcMonitor eventMeshGrpcMonitor;

    public EventMeshGrpcServer(EventMeshServer eventMeshServer, EventMeshGrpcConfiguration eventMeshGrpcConfiguration) {
        this.eventMeshServer = eventMeshServer;
        this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration;
        this.metaStorage = eventMeshServer.getMetaStorage();
        this.acl = eventMeshServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.boot.RemotingServer
    public void init() throws Exception {
        log.info("==================EventMeshGRPCServer Initializing==================");
        initThreadPool();
        initHttpClientPool();
        this.msgRateLimiter = RateLimiter.create(this.eventMeshGrpcConfiguration.getEventMeshMsgReqNumPerSecond());
        initProducerManager();
        this.consumerManager = new ConsumerManager(this);
        this.consumerManager.init();
        this.grpcRetryer = new GrpcRetryer(this);
        int grpcServerPort = this.eventMeshGrpcConfiguration.getGrpcServerPort();
        this.server = ServerBuilder.forPort(grpcServerPort).addService(new ConsumerService(this, this.sendMsgExecutor, this.replyMsgExecutor)).addService(new HeartbeatService(this, this.sendMsgExecutor)).addService(new PublisherService(this, this.sendMsgExecutor)).build();
        initMetricsMonitor();
        log.info("GRPCServer[port={}] started", Integer.valueOf(grpcServerPort));
        log.info("-----------------EventMeshGRPCServer initialized");
    }

    @Override // org.apache.eventmesh.runtime.boot.RemotingServer
    public CommonConfiguration getConfiguration() {
        return this.eventMeshGrpcConfiguration;
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        log.info("---------------EventMeshGRPCServer starting-------------------");
        this.producerManager.start();
        this.consumerManager.start();
        this.grpcRetryer.start();
        this.server.start();
        if (this.eventMeshGrpcConfiguration.isEventMeshServerMetaStorageEnable()) {
            register();
        }
        this.eventMeshGrpcMonitor.start();
        log.info("---------------EventMeshGRPCServer running-------------------");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        log.info("---------------EventMeshGRPCServer stopping-------------------");
        this.producerManager.shutdown();
        this.consumerManager.shutdown();
        this.grpcRetryer.shutdown();
        shutdownThreadPools();
        shutdownHttpClientPool();
        this.server.shutdown();
        if (this.eventMeshGrpcConfiguration.isEventMeshServerMetaStorageEnable()) {
            unRegister();
        }
        this.eventMeshGrpcMonitor.shutdown();
        log.info("---------------EventMeshGRPCServer stopped-------------------");
    }

    public boolean register() {
        boolean z = false;
        try {
            String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshGrpcConfiguration.getGrpcServerPort();
            EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
            eventMeshRegisterInfo.setEventMeshClusterName(this.eventMeshGrpcConfiguration.getEventMeshCluster());
            eventMeshRegisterInfo.setEventMeshName(this.eventMeshGrpcConfiguration.getEventMeshName() + "-GRPC");
            eventMeshRegisterInfo.setEndPoint(str);
            eventMeshRegisterInfo.setProtocolType("GRPC");
            z = this.metaStorage.register(eventMeshRegisterInfo);
        } catch (Exception e) {
            log.warn("eventMesh register to registry failed", e);
        }
        return z;
    }

    private void unRegister() throws Exception {
        String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshGrpcConfiguration.getGrpcServerPort();
        EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
        eventMeshUnRegisterInfo.setEventMeshClusterName(this.eventMeshGrpcConfiguration.getEventMeshCluster());
        eventMeshUnRegisterInfo.setEventMeshName(this.eventMeshGrpcConfiguration.getEventMeshName());
        eventMeshUnRegisterInfo.setEndPoint(str);
        eventMeshUnRegisterInfo.setProtocolType("GRPC");
        if (!this.metaStorage.unRegister(eventMeshUnRegisterInfo)) {
            throw new EventMeshException("eventMesh fail to unRegister");
        }
    }

    public EventMeshGrpcConfiguration getEventMeshGrpcConfiguration() {
        return this.eventMeshGrpcConfiguration;
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public GrpcRetryer getGrpcRetryer() {
        return this.grpcRetryer;
    }

    public ThreadPoolExecutor getSendMsgExecutor() {
        return this.sendMsgExecutor;
    }

    public ThreadPoolExecutor getClientMgmtExecutor() {
        return this.clientMgmtExecutor;
    }

    public ThreadPoolExecutor getPushMsgExecutor() {
        return this.pushMsgExecutor;
    }

    public RateLimiter getMsgRateLimiter() {
        return this.msgRateLimiter;
    }

    public CloseableHttpClient getHttpClient() {
        int size = this.httpClientPool.size();
        return this.httpClientPool.get(RandomUtils.nextInt(size, 2 * size) % size);
    }

    public EventMeshGrpcMonitor getMetricsMonitor() {
        return this.eventMeshGrpcMonitor;
    }

    private void initThreadPool() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.getEventMeshServerSendMsgBlockQueueSize());
        this.sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(), this.eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(), linkedBlockingQueue, "eventMesh-grpc-sendMsg", true);
        this.clientMgmtExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(), this.eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgBlockQueueSize()), "eventMesh-grpc-clientMgmt", true);
        this.pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(), this.eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.getEventMeshServerPushMsgBlockQueueSize()), "eventMesh-grpc-pushMsg", true);
        this.replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(), this.eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(), linkedBlockingQueue, "eventMesh-grpc-replyMsg", true);
    }

    private void initHttpClientPool() {
        this.httpClientPool = new ArrayList();
        int nextInt = RandomUtils.nextInt(5, MAX_LIMIT);
        for (int i = 0; i < nextInt; i++) {
            this.httpClientPool.add(HttpClients.createDefault());
        }
    }

    private void initMetricsMonitor() throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        Optional.ofNullable(this.eventMeshGrpcConfiguration.getEventMeshMetricsPluginType()).ifPresent(list -> {
            list.forEach(str -> {
                newArrayList.add(MetricsPluginFactory.getMetricsRegistry(str));
            });
        });
        this.eventMeshGrpcMonitor = new EventMeshGrpcMonitor(this, newArrayList);
        this.eventMeshGrpcMonitor.init();
    }

    private void shutdownThreadPools() {
        this.sendMsgExecutor.shutdown();
        this.clientMgmtExecutor.shutdown();
        this.pushMsgExecutor.shutdown();
        this.replyMsgExecutor.shutdown();
    }

    private void shutdownHttpClientPool() {
        Iterator<CloseableHttpClient> it = this.httpClientPool.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
            it.remove();
        }
    }

    public MetaStorage getMetaStorage() {
        return this.metaStorage;
    }

    public Acl getAcl() {
        return this.acl;
    }
}
