package org.apache.eventmesh.runtime.boot;

import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.GoodbyeProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.HeartBeatProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.HelloProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.ListenProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.MessageAckProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.MessageTransferProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.RecommendProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.SubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManager;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/boot/EventMeshTCPServer.class */
public class EventMeshTCPServer extends AbstractTCPServer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshTCPServer.class);
    private final EventMeshServer eventMeshServer;
    private final EventMeshTCPConfiguration eventMeshTCPConfiguration;
    private final MetaStorage metaStorage;
    private final Acl acl;
    private ClientSessionGroupMapping clientSessionGroupMapping;
    private TcpRetryer tcpRetryer;
    private AdminWebHookConfigOperationManager adminWebHookConfigOperationManage;
    private RateLimiter rateLimiter;
    private EventMeshRebalanceService eventMeshRebalanceService;

    public EventMeshTCPServer(EventMeshServer eventMeshServer, EventMeshTCPConfiguration eventMeshTCPConfiguration) {
        super(eventMeshTCPConfiguration);
        this.eventMeshServer = eventMeshServer;
        this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
        this.metaStorage = eventMeshServer.getMetaStorage();
        this.acl = eventMeshServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractTCPServer, org.apache.eventmesh.runtime.boot.RemotingServer
    public void init() throws Exception {
        LogUtils.info(log, "==================EventMeshTCPServer Initialing==================");
        super.init();
        this.rateLimiter = RateLimiter.create(this.eventMeshTCPConfiguration.getEventMeshTcpMsgReqnumPerSecond().intValue());
        ArrayList newArrayList = Lists.newArrayList();
        Optional.ofNullable(this.eventMeshTCPConfiguration.getEventMeshMetricsPluginType()).ifPresent(list -> {
            list.forEach(str -> {
                newArrayList.add(MetricsPluginFactory.getMetricsRegistry(str));
            });
        });
        this.tcpRetryer = new TcpRetryer(this);
        this.clientSessionGroupMapping = new ClientSessionGroupMapping(this);
        this.clientSessionGroupMapping.init();
        super.setClientSessionGroupMapping(this.clientSessionGroupMapping);
        super.setEventMeshTcpMonitor(new EventMeshTcpMonitor(this, newArrayList));
        super.getEventMeshTcpMonitor().init();
        if (this.eventMeshTCPConfiguration.isEventMeshServerMetaStorageEnable()) {
            this.eventMeshRebalanceService = new EventMeshRebalanceService(this, new EventMeshRebalanceImpl(this));
            this.eventMeshRebalanceService.init();
        }
        this.adminWebHookConfigOperationManage = new AdminWebHookConfigOperationManager();
        this.adminWebHookConfigOperationManage.init();
        registerTCPRequestProcessor();
        LogUtils.info(log, "--------------------------EventMeshTCPServer Inited");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractTCPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void start() throws Exception {
        super.start();
        super.getEventMeshTcpMonitor().start();
        this.clientSessionGroupMapping.start();
        this.tcpRetryer.start();
        if (this.eventMeshTCPConfiguration.isEventMeshServerMetaStorageEnable()) {
            register();
            this.eventMeshRebalanceService.start();
        }
        LogUtils.info(log, "--------------------------EventMeshTCPServer Started");
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractTCPServer, org.apache.eventmesh.runtime.boot.AbstractRemotingServer
    public void shutdown() throws Exception {
        super.shutdown();
        super.getEventMeshTcpMonitor().shutdown();
        this.clientSessionGroupMapping.shutdown();
        ThreadUtils.sleep(40L, TimeUnit.SECONDS);
        this.tcpRetryer.shutdown();
        if (this.eventMeshTCPConfiguration.isEventMeshServerMetaStorageEnable()) {
            this.eventMeshRebalanceService.shutdown();
            unRegister();
        }
        LogUtils.info(log, "--------------------------EventMeshTCPServer Shutdown");
    }

    public boolean register() {
        boolean z = false;
        try {
            String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshTCPConfiguration.getEventMeshTcpServerPort();
            EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
            eventMeshRegisterInfo.setEventMeshClusterName(this.eventMeshTCPConfiguration.getEventMeshCluster());
            eventMeshRegisterInfo.setEventMeshName(this.eventMeshTCPConfiguration.getEventMeshName() + "-TCP");
            eventMeshRegisterInfo.setEndPoint(str);
            eventMeshRegisterInfo.setEventMeshInstanceNumMap(this.clientSessionGroupMapping.prepareProxyClientDistributionData());
            eventMeshRegisterInfo.setProtocolType("TCP");
            z = this.metaStorage.register(eventMeshRegisterInfo);
        } catch (Exception e) {
            log.error("eventMesh register to registry failed", e);
        }
        return z;
    }

    private void unRegister() {
        String str = IPUtils.getLocalAddress() + EventMeshConstants.IP_PORT_SEPARATOR + this.eventMeshTCPConfiguration.getEventMeshTcpServerPort();
        EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
        eventMeshUnRegisterInfo.setEventMeshClusterName(this.eventMeshTCPConfiguration.getEventMeshCluster());
        eventMeshUnRegisterInfo.setEventMeshName(this.eventMeshTCPConfiguration.getEventMeshName());
        eventMeshUnRegisterInfo.setEndPoint(str);
        eventMeshUnRegisterInfo.setProtocolType("TCP");
        if (!this.metaStorage.unRegister(eventMeshUnRegisterInfo)) {
            throw new EventMeshException("eventMesh fail to unRegister");
        }
    }

    private void registerTCPRequestProcessor() {
        ThreadPoolExecutor taskHandleExecutorService = super.getTcpThreadPoolGroup().getTaskHandleExecutorService();
        registerProcessor(Command.HELLO_REQUEST, new HelloProcessor(this), taskHandleExecutorService);
        registerProcessor(Command.RECOMMEND_REQUEST, new RecommendProcessor(this), taskHandleExecutorService);
        registerProcessor(Command.HEARTBEAT_REQUEST, new HeartBeatProcessor(this), taskHandleExecutorService);
        GoodbyeProcessor goodbyeProcessor = new GoodbyeProcessor(this);
        registerProcessor(Command.CLIENT_GOODBYE_REQUEST, goodbyeProcessor, taskHandleExecutorService);
        registerProcessor(Command.SERVER_GOODBYE_RESPONSE, goodbyeProcessor, taskHandleExecutorService);
        registerProcessor(Command.SUBSCRIBE_REQUEST, new SubscribeProcessor(this), taskHandleExecutorService);
        registerProcessor(Command.UNSUBSCRIBE_REQUEST, new UnSubscribeProcessor(this), taskHandleExecutorService);
        registerProcessor(Command.LISTEN_REQUEST, new ListenProcessor(this), taskHandleExecutorService);
        ThreadPoolExecutor sendExecutorService = super.getTcpThreadPoolGroup().getSendExecutorService();
        MessageTransferProcessor messageTransferProcessor = new MessageTransferProcessor(this);
        registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, sendExecutorService);
        registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);
        registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);
        registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, super.getTcpThreadPoolGroup().getReplyExecutorService());
        ThreadPoolExecutor ackExecutorService = super.getTcpThreadPoolGroup().getAckExecutorService();
        MessageAckProcessor messageAckProcessor = new MessageAckProcessor(this);
        registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
        registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
        registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
        registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
    }

    public EventMeshServer getEventMeshServer() {
        return this.eventMeshServer;
    }

    public EventMeshTCPConfiguration getEventMeshTCPConfiguration() {
        return this.eventMeshTCPConfiguration;
    }

    public MetaStorage getMetaStorage() {
        return this.metaStorage;
    }

    public EventMeshRebalanceService getEventMeshRebalanceService() {
        return this.eventMeshRebalanceService;
    }

    public AdminWebHookConfigOperationManager getAdminWebHookConfigOperationManage() {
        return this.adminWebHookConfigOperationManage;
    }

    public void setAdminWebHookConfigOperationManage(AdminWebHookConfigOperationManager adminWebHookConfigOperationManager) {
        this.adminWebHookConfigOperationManage = adminWebHookConfigOperationManager;
    }

    public Acl getAcl() {
        return this.acl;
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractTCPServer
    public ClientSessionGroupMapping getClientSessionGroupMapping() {
        return this.clientSessionGroupMapping;
    }

    @Override // org.apache.eventmesh.runtime.boot.AbstractTCPServer
    public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) {
        this.clientSessionGroupMapping = clientSessionGroupMapping;
    }

    public RateLimiter getRateLimiter() {
        return this.rateLimiter;
    }

    public void setRateLimiter(RateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    public TcpRetryer getTcpRetryer() {
        return this.tcpRetryer;
    }
}
