package org.apache.eventmesh.runtime.boot;

import com.google.common.util.concurrent.RateLimiter;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ConsumerService;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.HeartbeatService;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ProducerService;
import org.apache.eventmesh.runtime.registry.Registry;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.class */
public class EventMeshGrpcServer {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
    private Server server;
    private ProducerManager producerManager;
    private ConsumerManager consumerManager;
    private GrpcRetryer grpcRetryer;
    private ThreadPoolExecutor sendMsgExecutor;
    private ThreadPoolExecutor replyMsgExecutor;
    private ThreadPoolExecutor clientMgmtExecutor;
    private ThreadPoolExecutor pushMsgExecutor;
    private List<CloseableHttpClient> httpClientPool;
    private RateLimiter msgRateLimiter;
    private Registry registry;

    public EventMeshGrpcServer(EventMeshGrpcConfiguration eventMeshGrpcConfiguration, Registry registry) {
        this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration;
        this.registry = registry;
    }

    public void init() throws Exception {
        this.logger.info("==================EventMeshGRPCServer Initializing==================");
        initThreadPool();
        initHttpClientPool();
        this.msgRateLimiter = RateLimiter.create(this.eventMeshGrpcConfiguration.eventMeshMsgReqNumPerSecond);
        this.producerManager = new ProducerManager(this);
        this.producerManager.init();
        this.consumerManager = new ConsumerManager(this);
        this.consumerManager.init();
        this.grpcRetryer = new GrpcRetryer(this);
        this.grpcRetryer.init();
        int i = this.eventMeshGrpcConfiguration.grpcServerPort;
        this.server = ServerBuilder.forPort(i).addService(new ProducerService(this, this.sendMsgExecutor)).addService(new ConsumerService(this, this.clientMgmtExecutor, this.replyMsgExecutor)).addService(new HeartbeatService(this, this.clientMgmtExecutor)).build();
        this.logger.info("GRPCServer[port={}] started", Integer.valueOf(i));
        this.logger.info("-----------------EventMeshGRPCServer initialized");
    }

    public void start() throws Exception {
        this.logger.info("---------------EventMeshGRPCServer starting-------------------");
        this.producerManager.start();
        this.consumerManager.start();
        this.grpcRetryer.start();
        this.server.start();
        if (this.eventMeshGrpcConfiguration.eventMeshServerRegistryEnable) {
            register();
        }
        this.logger.info("---------------EventMeshGRPCServer running-------------------");
    }

    public void shutdown() throws Exception {
        this.logger.info("---------------EventMeshGRPCServer stopping-------------------");
        this.producerManager.shutdown();
        this.consumerManager.shutdown();
        this.grpcRetryer.shutdown();
        shutdownThreadPools();
        shutdownHttpClientPool();
        this.server.shutdown();
        if (this.eventMeshGrpcConfiguration.eventMeshServerRegistryEnable) {
            unRegister();
        }
        this.logger.info("---------------EventMeshGRPCServer stopped-------------------");
    }

    public boolean register() {
        boolean z = false;
        try {
            String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshGrpcConfiguration.grpcServerPort;
            EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
            eventMeshRegisterInfo.setEventMeshClusterName(this.eventMeshGrpcConfiguration.eventMeshCluster);
            eventMeshRegisterInfo.setEventMeshName(this.eventMeshGrpcConfiguration.eventMeshName + "-GRPC");
            eventMeshRegisterInfo.setEndPoint(str);
            eventMeshRegisterInfo.setProtocolType("GRPC");
            z = this.registry.register(eventMeshRegisterInfo);
        } catch (Exception e) {
            this.logger.warn("eventMesh register to registry failed", e);
        }
        return z;
    }

    private void unRegister() throws Exception {
        String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshGrpcConfiguration.grpcServerPort;
        EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
        eventMeshUnRegisterInfo.setEventMeshClusterName(this.eventMeshGrpcConfiguration.eventMeshCluster);
        eventMeshUnRegisterInfo.setEventMeshName(this.eventMeshGrpcConfiguration.eventMeshName);
        eventMeshUnRegisterInfo.setEndPoint(str);
        eventMeshUnRegisterInfo.setProtocolType("GRPC");
        if (!this.registry.unRegister(eventMeshUnRegisterInfo)) {
            throw new EventMeshException("eventMesh fail to unRegister");
        }
    }

    public EventMeshGrpcConfiguration getEventMeshGrpcConfiguration() {
        return this.eventMeshGrpcConfiguration;
    }

    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public GrpcRetryer getGrpcRetryer() {
        return this.grpcRetryer;
    }

    public ThreadPoolExecutor getSendMsgExecutor() {
        return this.sendMsgExecutor;
    }

    public ThreadPoolExecutor getClientMgmtExecutor() {
        return this.clientMgmtExecutor;
    }

    public ThreadPoolExecutor getPushMsgExecutor() {
        return this.pushMsgExecutor;
    }

    public RateLimiter getMsgRateLimiter() {
        return this.msgRateLimiter;
    }

    public CloseableHttpClient getHttpClient() {
        int size = this.httpClientPool.size();
        return this.httpClientPool.get(RandomUtils.nextInt(size, 2 * size) % size);
    }

    private void initThreadPool() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.eventMeshServerSendMsgBlockQueueSize);
        this.sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.eventMeshServerSendMsgThreadNum, this.eventMeshGrpcConfiguration.eventMeshServerSendMsgThreadNum, linkedBlockingQueue, "eventMesh-grpc-sendMsg-%d", true);
        this.clientMgmtExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.eventMeshServerSubscribeMsgThreadNum, this.eventMeshGrpcConfiguration.eventMeshServerSubscribeMsgThreadNum, new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.eventMeshServerSubscribeMsgBlockQueueSize), "eventMesh-grpc-clientMgmt-%d", true);
        this.pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.eventMeshServerPushMsgThreadNum, this.eventMeshGrpcConfiguration.eventMeshServerPushMsgThreadNum, new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.eventMeshServerPushMsgBlockQueueSize), "eventMesh-grpc-pushMsg-%d", true);
        new LinkedBlockingQueue(this.eventMeshGrpcConfiguration.eventMeshServerSendMsgBlockQueueSize);
        this.replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshGrpcConfiguration.eventMeshServerReplyMsgThreadNum, this.eventMeshGrpcConfiguration.eventMeshServerReplyMsgThreadNum, linkedBlockingQueue, "eventMesh-grpc-replyMsg-%d", true);
    }

    private void initHttpClientPool() {
        this.httpClientPool = new LinkedList();
        for (int i = 0; i < 8; i++) {
            this.httpClientPool.add(HttpClients.createDefault());
        }
    }

    private void shutdownThreadPools() {
        this.sendMsgExecutor.shutdown();
        this.clientMgmtExecutor.shutdown();
        this.pushMsgExecutor.shutdown();
        this.replyMsgExecutor.shutdown();
    }

    private void shutdownHttpClientPool() {
        Iterator<CloseableHttpClient> it = this.httpClientPool.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
            it.remove();
        }
    }
}
