package org.apache.eventmesh.runtime.boot;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.LocalUnSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.RemoteSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.RemoteUnSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncRemoteEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.WebHookProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.registry.Registry;
import org.apache.eventmesh.webhook.receive.WebHookController;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.class */
public class EventMeshHTTPServer extends AbstractHTTPServer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshHTTPServer.class);
    private final EventMeshServer eventMeshServer;
    private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
    private final Registry registry;
    private final Acl acl;
    public final EventBus eventBus;
    private ConsumerManager consumerManager;
    private SubscriptionManager subscriptionManager;
    private ProducerManager producerManager;
    private HttpRetryer httpRetryer;
    private ThreadPoolExecutor batchMsgExecutor;
    private ThreadPoolExecutor sendMsgExecutor;
    private ThreadPoolExecutor remoteMsgExecutor;
    private ThreadPoolExecutor replyMsgExecutor;
    private ThreadPoolExecutor pushMsgExecutor;
    private ThreadPoolExecutor clientManageExecutor;
    private ThreadPoolExecutor adminExecutor;
    private ThreadPoolExecutor webhookExecutor;
    private transient RateLimiter msgRateLimiter;
    private transient RateLimiter batchRateLimiter;
    private final transient HTTPClientPool httpClientPool;

    public EventMeshHTTPServer(EventMeshServer eventMeshServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
        super(eventMeshHTTPConfiguration.getHttpServerPort(), eventMeshHTTPConfiguration.isEventMeshServerUseTls(), eventMeshHTTPConfiguration);
        this.eventBus = new EventBus();
        this.httpClientPool = new HTTPClientPool(10);
        this.eventMeshServer = eventMeshServer;
        this.eventMeshHttpConfiguration = eventMeshHTTPConfiguration;
        this.registry = eventMeshServer.getRegistry();
        this.acl = eventMeshServer.getAcl();
    }

    private void initThreadPool() {
        this.batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()), "eventMesh-batchMsg", true);
        this.sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()), "eventMesh-sendMsg", true);
        this.remoteMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()), "eventMesh-remoteMsg", true);
        this.pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(), new LinkedBlockingQueue(this.eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()), "eventMesh-pushMsg", true);
        this.clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(), new LinkedBlockingQueue(this.eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()), "eventMesh-clientManage", true);
        this.adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(), new LinkedBlockingQueue(50), "eventMesh-admin", true);
        this.replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(), new LinkedBlockingQueue(100), "eventMesh-replyMsg", true);
    }

    public void shutdownThreadPool() {
        if (this.batchMsgExecutor != null) {
            this.batchMsgExecutor.shutdown();
        }
        if (this.adminExecutor != null) {
            this.adminExecutor.shutdown();
        }
        if (this.clientManageExecutor != null) {
            this.clientManageExecutor.shutdown();
        }
        if (this.sendMsgExecutor != null) {
            this.sendMsgExecutor.shutdown();
        }
        if (this.remoteMsgExecutor != null) {
            this.remoteMsgExecutor.shutdown();
        }
        if (this.pushMsgExecutor != null) {
            this.pushMsgExecutor.shutdown();
        }
        if (this.replyMsgExecutor != null) {
            this.replyMsgExecutor.shutdown();
        }
    }

    private void init() throws Exception {
        if (log.isInfoEnabled()) {
            log.info("==================EventMeshHTTPServer Initialing==================");
        }
        super.init("eventMesh-http");
        initThreadPool();
        this.msgRateLimiter = RateLimiter.create(this.eventMeshHttpConfiguration.getEventMeshHttpMsgReqNumPerSecond());
        this.batchRateLimiter = RateLimiter.create(this.eventMeshHttpConfiguration.getEventMeshBatchMsgRequestNumPerSecond());
        ArrayList newArrayList = Lists.newArrayList();
        Optional.ofNullable(this.eventMeshHttpConfiguration.getEventMeshMetricsPluginType()).ifPresent(list -> {
            list.forEach(str -> {
                newArrayList.add(MetricsPluginFactory.getMetricsRegistry(str));
            });
        });
        this.httpRetryer = new HttpRetryer(this);
        this.httpRetryer.init();
        setMetrics(new HTTPMetricsServer(this, newArrayList));
        this.subscriptionManager = new SubscriptionManager();
        this.consumerManager = new ConsumerManager(this);
        this.consumerManager.init();
        this.producerManager = new ProducerManager(this);
        this.producerManager.init();
        setHandlerService(new HandlerService());
        getHandlerService().setMetrics(getMetrics());
        if (StringUtils.isNotEmpty(this.eventMeshHttpConfiguration.getEventMeshTracePluginType()) && this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()) {
            setUseTrace(Boolean.valueOf(this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()));
        }
        getHandlerService().setHttpTrace(new HTTPTrace(this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()));
        registerHTTPRequestProcessor();
        initWebhook();
        if (log.isInfoEnabled()) {
            log.info("==================EventMeshHTTPServer initialized==================");
        }
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        init();
        super.start();
        getMetrics().start();
        this.consumerManager.start();
        this.producerManager.start();
        this.httpRetryer.start();
        if (this.eventMeshHttpConfiguration.isEventMeshServerRegistryEnable()) {
            register();
        }
        if (log.isInfoEnabled()) {
            log.info("==================EventMeshHTTPServer started==================");
        }
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        super.shutdown();
        getMetrics().shutdown();
        this.consumerManager.shutdown();
        shutdownThreadPool();
        this.httpClientPool.shutdown();
        this.producerManager.shutdown();
        this.httpRetryer.shutdown();
        if (this.eventMeshHttpConfiguration.isEventMeshServerRegistryEnable()) {
            unRegister();
        }
        if (log.isInfoEnabled()) {
            log.info("==================EventMeshHTTPServer shutdown==================");
        }
    }

    public boolean register() {
        boolean z = false;
        try {
            String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshHttpConfiguration.getHttpServerPort();
            EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
            eventMeshRegisterInfo.setEventMeshClusterName(this.eventMeshHttpConfiguration.getEventMeshCluster());
            eventMeshRegisterInfo.setEventMeshName(this.eventMeshHttpConfiguration.getEventMeshName() + "-HTTP");
            eventMeshRegisterInfo.setEndPoint(str);
            eventMeshRegisterInfo.setProtocolType("HTTP");
            z = this.registry.register(eventMeshRegisterInfo);
        } catch (Exception e) {
            log.error("eventMesh register to registry failed", e);
        }
        return z;
    }

    private void unRegister() throws Exception {
        String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshHttpConfiguration.getHttpServerPort();
        EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
        eventMeshUnRegisterInfo.setEventMeshClusterName(this.eventMeshHttpConfiguration.getEventMeshCluster());
        eventMeshUnRegisterInfo.setEventMeshName(this.eventMeshHttpConfiguration.getEventMeshName());
        eventMeshUnRegisterInfo.setEndPoint(str);
        eventMeshUnRegisterInfo.setProtocolType("HTTP");
        if (!this.registry.unRegister(eventMeshUnRegisterInfo)) {
            throw new EventMeshException("eventMesh fail to unRegister");
        }
    }

    public void registerHTTPRequestProcessor() {
        registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), new BatchSendMessageProcessor(this), this.batchMsgExecutor);
        registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), new BatchSendMessageV2Processor(this), this.batchMsgExecutor);
        registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), new SendSyncMessageProcessor(this), this.sendMsgExecutor);
        registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), new SendAsyncMessageProcessor(this), this.sendMsgExecutor);
        getHandlerService().register(new SendAsyncEventProcessor(this), this.sendMsgExecutor);
        getHandlerService().register(new SendAsyncRemoteEventProcessor(this), this.remoteMsgExecutor);
        registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), new AdminMetricsProcessor(this), this.adminExecutor);
        registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), new HeartBeatProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), new SubscribeProcessor(this), this.clientManageExecutor);
        getHandlerService().register(new LocalSubscribeEventProcessor(this), this.clientManageExecutor);
        getHandlerService().register(new RemoteSubscribeEventProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), new UnSubscribeProcessor(this), this.clientManageExecutor);
        getHandlerService().register(new LocalUnSubscribeEventProcessor(this), this.clientManageExecutor);
        getHandlerService().register(new RemoteUnSubscribeEventProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), new ReplyMessageProcessor(this), this.replyMsgExecutor);
    }

    private void initWebhook() throws Exception {
        this.webhookExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(), this.eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(), new LinkedBlockingQueue(100), "eventMesh-webhook", true);
        WebHookProcessor webHookProcessor = new WebHookProcessor();
        WebHookController webHookController = new WebHookController();
        webHookController.init();
        webHookProcessor.setWebHookController(webHookController);
        getHandlerService().register(webHookProcessor, this.webhookExecutor);
    }

    public SubscriptionManager getSubscriptionManager() {
        return this.subscriptionManager;
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() {
        return this.eventMeshHttpConfiguration;
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public HttpRetryer getHttpRetryer() {
        return this.httpRetryer;
    }

    public Acl getAcl() {
        return this.acl;
    }

    public EventMeshServer getEventMeshServer() {
        return this.eventMeshServer;
    }

    public ThreadPoolExecutor getBatchMsgExecutor() {
        return this.batchMsgExecutor;
    }

    public ThreadPoolExecutor getSendMsgExecutor() {
        return this.sendMsgExecutor;
    }

    public ThreadPoolExecutor getReplyMsgExecutor() {
        return this.replyMsgExecutor;
    }

    public ThreadPoolExecutor getPushMsgExecutor() {
        return this.pushMsgExecutor;
    }

    public ThreadPoolExecutor getClientManageExecutor() {
        return this.clientManageExecutor;
    }

    public ThreadPoolExecutor getAdminExecutor() {
        return this.adminExecutor;
    }

    public RateLimiter getMsgRateLimiter() {
        return this.msgRateLimiter;
    }

    public RateLimiter getBatchRateLimiter() {
        return this.batchRateLimiter;
    }

    public Registry getRegistry() {
        return this.registry;
    }

    public HTTPClientPool getHttpClientPool() {
        return this.httpClientPool;
    }
}
