package org.apache.eventmesh.runtime.boot;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.CreateTopicProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.DeleteTopicProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.LocalUnSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.QuerySubscriptionProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.RemoteSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.RemoteUnSubscribeEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncRemoteEventProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.WebHookProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.webhook.receive.WebHookController;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.class */
public class EventMeshHTTPServer extends AbstractHTTPServer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshHTTPServer.class);
    private final EventMeshServer eventMeshServer;
    private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
    private final MetaStorage metaStorage;
    private final Acl acl;
    private final EventBus eventBus;
    private ConsumerManager consumerManager;
    private ProducerManager producerManager;
    private SubscriptionManager subscriptionManager;
    private FilterEngine filterEngine;
    private TransformerEngine transformerEngine;
    private HttpRetryer httpRetryer;
    private transient RateLimiter msgRateLimiter;
    private transient RateLimiter batchRateLimiter;
    private final transient HTTPClientPool httpClientPool;

    public EventMeshHTTPServer(EventMeshServer eventMeshServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
        super(eventMeshHTTPConfiguration.getHttpServerPort(), eventMeshHTTPConfiguration.isEventMeshServerUseTls(), eventMeshHTTPConfiguration);
        this.eventBus = new EventBus();
        this.httpClientPool = new HTTPClientPool(10);
        this.eventMeshServer = eventMeshServer;
        this.eventMeshHttpConfiguration = eventMeshHTTPConfiguration;
        this.metaStorage = eventMeshServer.getMetaStorage();
        this.acl = eventMeshServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractHTTPServer, org.apache.eventmesh.runtime.boot.RemotingServer
    public void init() throws Exception {
        LogUtils.info(log, "==================EventMeshHTTPServer Initialing==================");
        super.init();
        this.msgRateLimiter = RateLimiter.create(this.eventMeshHttpConfiguration.getEventMeshHttpMsgReqNumPerSecond());
        this.batchRateLimiter = RateLimiter.create(this.eventMeshHttpConfiguration.getEventMeshBatchMsgRequestNumPerSecond());
        ArrayList newArrayList = Lists.newArrayList();
        Optional.ofNullable(this.eventMeshHttpConfiguration.getEventMeshMetricsPluginType()).ifPresent(list -> {
            list.forEach(str -> {
                newArrayList.add(MetricsPluginFactory.getMetricsRegistry(str));
            });
        });
        this.httpRetryer = new HttpRetryer(this);
        super.setMetrics(new HTTPMetricsServer(this, newArrayList));
        this.subscriptionManager = new SubscriptionManager(this.eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable(), this.metaStorage);
        this.consumerManager = new ConsumerManager(this);
        this.consumerManager.init();
        this.producerManager = new ProducerManager(this);
        this.producerManager.init();
        this.filterEngine = new FilterEngine(this.metaStorage, this.producerManager, this.consumerManager);
        this.transformerEngine = new TransformerEngine(this.metaStorage, this.producerManager, this.consumerManager);
        super.setHandlerService(new HandlerService());
        super.getHandlerService().setMetrics(getMetrics());
        if (StringUtils.isNotEmpty(this.eventMeshHttpConfiguration.getEventMeshTracePluginType()) && this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()) {
            super.setUseTrace(Boolean.valueOf(this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()));
        }
        super.getHandlerService().setHttpTrace(new HTTPTrace(this.eventMeshHttpConfiguration.isEventMeshServerTraceEnable()));
        registerHTTPRequestProcessor();
        LogUtils.info(log, "==================EventMeshHTTPServer initialized==================");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        super.start();
        getMetrics().start();
        this.consumerManager.start();
        this.producerManager.start();
        this.httpRetryer.start();
        if (this.metaStorage.getStarted().get()) {
            this.filterEngine.start();
        }
        if (this.eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable()) {
            register();
        }
        LogUtils.info(log, "==================EventMeshHTTPServer started==================");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractHTTPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        super.shutdown();
        getMetrics().shutdown();
        this.filterEngine.shutdown();
        this.transformerEngine.shutdown();
        this.consumerManager.shutdown();
        this.httpClientPool.shutdown();
        this.producerManager.shutdown();
        this.httpRetryer.shutdown();
        if (this.eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable()) {
            unRegister();
        }
        LogUtils.info(log, "==================EventMeshHTTPServer shutdown==================");
    }

    public boolean register() {
        boolean z = false;
        try {
            String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshHttpConfiguration.getHttpServerPort();
            EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
            eventMeshRegisterInfo.setEventMeshClusterName(this.eventMeshHttpConfiguration.getEventMeshCluster());
            eventMeshRegisterInfo.setEventMeshName(this.eventMeshHttpConfiguration.getEventMeshName() + "-HTTP");
            eventMeshRegisterInfo.setEndPoint(str);
            eventMeshRegisterInfo.setProtocolType("HTTP");
            z = this.metaStorage.register(eventMeshRegisterInfo);
        } catch (Exception e) {
            log.error("eventMesh register to registry failed", e);
        }
        return z;
    }

    private void unRegister() {
        String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshHttpConfiguration.getHttpServerPort();
        EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
        eventMeshUnRegisterInfo.setEventMeshClusterName(this.eventMeshHttpConfiguration.getEventMeshCluster());
        eventMeshUnRegisterInfo.setEventMeshName(this.eventMeshHttpConfiguration.getEventMeshName());
        eventMeshUnRegisterInfo.setEndPoint(str);
        eventMeshUnRegisterInfo.setProtocolType("HTTP");
        if (!this.metaStorage.unRegister(eventMeshUnRegisterInfo)) {
            throw new EventMeshException("eventMesh fail to unRegister");
        }
    }

    private void registerHTTPRequestProcessor() throws Exception {
        HTTPThreadPoolGroup httpThreadPoolGroup = super.getHttpThreadPoolGroup();
        ThreadPoolExecutor batchMsgExecutor = httpThreadPoolGroup.getBatchMsgExecutor();
        registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), new BatchSendMessageProcessor(this), batchMsgExecutor);
        registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), new BatchSendMessageV2Processor(this), batchMsgExecutor);
        ThreadPoolExecutor sendMsgExecutor = httpThreadPoolGroup.getSendMsgExecutor();
        registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), new SendSyncMessageProcessor(this), sendMsgExecutor);
        registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), new SendAsyncMessageProcessor(this), sendMsgExecutor);
        getHandlerService().register(new SendAsyncEventProcessor(this), sendMsgExecutor);
        ThreadPoolExecutor remoteMsgExecutor = httpThreadPoolGroup.getRemoteMsgExecutor();
        getHandlerService().register(new SendAsyncRemoteEventProcessor(this), remoteMsgExecutor);
        ThreadPoolExecutor runtimeAdminExecutor = httpThreadPoolGroup.getRuntimeAdminExecutor();
        registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), new AdminMetricsProcessor(this), runtimeAdminExecutor);
        ThreadPoolExecutor clientManageExecutor = httpThreadPoolGroup.getClientManageExecutor();
        registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), new HeartBeatProcessor(this), clientManageExecutor);
        registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), new SubscribeProcessor(this), clientManageExecutor);
        getHandlerService().register(new LocalSubscribeEventProcessor(this), clientManageExecutor);
        getHandlerService().register(new RemoteSubscribeEventProcessor(this), clientManageExecutor);
        registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), new UnSubscribeProcessor(this), clientManageExecutor);
        getHandlerService().register(new LocalUnSubscribeEventProcessor(this), clientManageExecutor);
        getHandlerService().register(new RemoteUnSubscribeEventProcessor(this), clientManageExecutor);
        ThreadPoolExecutor replyMsgExecutor = httpThreadPoolGroup.getReplyMsgExecutor();
        registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), new ReplyMessageProcessor(this), replyMsgExecutor);
        getHandlerService().register(new CreateTopicProcessor(this), clientManageExecutor);
        getHandlerService().register(new DeleteTopicProcessor(this), clientManageExecutor);
        getHandlerService().register(new QuerySubscriptionProcessor(this), clientManageExecutor);
        registerWebhook();
    }

    private void registerWebhook() throws Exception {
        WebHookProcessor webHookProcessor = new WebHookProcessor();
        WebHookController webHookController = new WebHookController();
        webHookController.init();
        webHookProcessor.setWebHookController(webHookController);
        getHandlerService().register(webHookProcessor, super.getHttpThreadPoolGroup().getWebhookExecutor());
    }

    public SubscriptionManager getSubscriptionManager() {
        return this.subscriptionManager;
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() {
        return this.eventMeshHttpConfiguration;
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public HttpRetryer getHttpRetryer() {
        return this.httpRetryer;
    }

    public Acl getAcl() {
        return this.acl;
    }

    public EventMeshServer getEventMeshServer() {
        return this.eventMeshServer;
    }

    public RateLimiter getMsgRateLimiter() {
        return this.msgRateLimiter;
    }

    public RateLimiter getBatchRateLimiter() {
        return this.batchRateLimiter;
    }

    public FilterEngine getFilterEngine() {
        return this.filterEngine;
    }

    public TransformerEngine getTransformerEngine() {
        return this.transformerEngine;
    }

    public MetaStorage getMetaStorage() {
        return this.metaStorage;
    }

    public HTTPClientPool getHttpClientPool() {
        return this.httpClientPool;
    }
}
