package org.apache.eventmesh.runtime.boot;

import com.google.common.eventbus.EventBus;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.class */
public class EventMeshHTTPServer extends AbrstractHTTPServer {
    private EventMeshServer eventMeshServer;
    public ServiceState serviceState;
    private EventMeshHTTPConfiguration eventMeshHttpConfiguration;
    public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping;
    public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping;
    public EventBus eventBus;
    private ConsumerManager consumerManager;
    private ProducerManager producerManager;
    private HttpRetryer httpRetryer;
    public ThreadPoolExecutor batchMsgExecutor;
    public ThreadPoolExecutor sendMsgExecutor;
    public ThreadPoolExecutor replyMsgExecutor;
    public ThreadPoolExecutor pushMsgExecutor;
    public ThreadPoolExecutor clientManageExecutor;
    public ThreadPoolExecutor adminExecutor;

    public EventMeshHTTPServer(EventMeshServer eventMeshServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
        super(eventMeshHTTPConfiguration.httpServerPort, eventMeshHTTPConfiguration.eventMeshServerUseTls);
        this.localConsumerGroupMapping = new ConcurrentHashMap<>();
        this.localClientInfoMapping = new ConcurrentHashMap<>();
        this.eventBus = new EventBus();
        this.eventMeshServer = eventMeshServer;
        this.eventMeshHttpConfiguration = eventMeshHTTPConfiguration;
    }

    public EventMeshServer getEventMeshServer() {
        return this.eventMeshServer;
    }

    public void shutdownThreadPool() throws Exception {
        this.batchMsgExecutor.shutdown();
        this.adminExecutor.shutdown();
        this.clientManageExecutor.shutdown();
        this.sendMsgExecutor.shutdown();
        this.pushMsgExecutor.shutdown();
        this.replyMsgExecutor.shutdown();
    }

    public void initThreadPool() throws Exception {
        this.batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, this.eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, new LinkedBlockingQueue(this.eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize), "eventMesh-batchmsg-", true);
        this.sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum, this.eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum, new LinkedBlockingQueue(this.eventMeshHttpConfiguration.eventMeshServerSendMsgBlockQSize), "eventMesh-sendmsg-", true);
        this.pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum, this.eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum, new LinkedBlockingQueue(this.eventMeshHttpConfiguration.eventMeshServerPushMsgBlockQSize), "eventMesh-pushmsg-", true);
        this.clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum, this.eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum, new LinkedBlockingQueue(this.eventMeshHttpConfiguration.eventMeshServerClientManageBlockQSize), "eventMesh-clientmanage-", true);
        this.adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerAdminThreadNum, this.eventMeshHttpConfiguration.eventMeshServerAdminThreadNum, new LinkedBlockingQueue(50), "eventMesh-admin-", true);
        this.replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(this.eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum, this.eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum, new LinkedBlockingQueue(100), "eventMesh-replymsg-", true);
    }

    public ThreadPoolExecutor getBatchMsgExecutor() {
        return this.batchMsgExecutor;
    }

    public ThreadPoolExecutor getSendMsgExecutor() {
        return this.sendMsgExecutor;
    }

    public ThreadPoolExecutor getReplyMsgExecutor() {
        return this.replyMsgExecutor;
    }

    public ThreadPoolExecutor getPushMsgExecutor() {
        return this.pushMsgExecutor;
    }

    public ThreadPoolExecutor getClientManageExecutor() {
        return this.clientManageExecutor;
    }

    public ThreadPoolExecutor getAdminExecutor() {
        return this.adminExecutor;
    }

    public void init() throws Exception {
        this.logger.info("==================EventMeshHTTPServer Initialing==================");
        super.init("eventMesh-http");
        initThreadPool();
        this.metrics = new HTTPMetricsServer(this);
        this.metrics.init();
        this.consumerManager = new ConsumerManager(this);
        this.consumerManager.init();
        this.producerManager = new ProducerManager(this);
        this.producerManager.init();
        this.httpRetryer = new HttpRetryer(this);
        this.httpRetryer.init();
        registerHTTPRequestProcessor();
        this.logger.info("--------------------------EventMeshHTTPServer inited");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbrstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        super.start();
        this.metrics.start();
        this.consumerManager.start();
        this.producerManager.start();
        this.httpRetryer.start();
        this.logger.info("--------------------------EventMeshHTTPServer started");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbrstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        super.shutdown();
        this.metrics.shutdown();
        this.consumerManager.shutdown();
        shutdownThreadPool();
        AbstractHTTPPushRequest.httpClientPool.shutdown();
        this.producerManager.shutdown();
        this.httpRetryer.shutdown();
        this.logger.info("--------------------------EventMeshHTTPServer shutdown");
    }

    public void registerHTTPRequestProcessor() {
        registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), new BatchSendMessageProcessor(this), this.batchMsgExecutor);
        registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), new BatchSendMessageV2Processor(this), this.batchMsgExecutor);
        registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), new SendSyncMessageProcessor(this), this.sendMsgExecutor);
        registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), new SendAsyncMessageProcessor(this), this.sendMsgExecutor);
        registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), new AdminMetricsProcessor(this), this.adminExecutor);
        registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), new HeartBeatProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), new SubscribeProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), new UnSubscribeProcessor(this), this.clientManageExecutor);
        registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), new ReplyMessageProcessor(this), this.replyMsgExecutor);
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() {
        return this.eventMeshHttpConfiguration;
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public HttpRetryer getHttpRetryer() {
        return this.httpRetryer;
    }
}
