package org.apache.eventmesh.runtime.metrics.tcp;

import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.TcpSummaryMetrics;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.class */
public class EventMeshTcpMonitor {
    private final EventMeshTCPServer eventMeshTCPServer;
    private static final int period = 60000;
    public ScheduledFuture<?> monitorTpsTask;
    public ScheduledFuture<?> monitorThreadPoolTask;
    private final List<MetricsRegistry> metricsRegistries;
    private static final Logger log = LoggerFactory.getLogger(EventMeshTcpMonitor.class);
    private static int PRINT_THREADPOOLSTATE_INTERVAL = 1;
    private final Logger tcpLogger = LoggerFactory.getLogger("tcpMonitor");
    private final Logger appLogger = LoggerFactory.getLogger("appMonitor");
    private final TcpSummaryMetrics tcpSummaryMetrics = new TcpSummaryMetrics();

    public EventMeshTCPServer getEventMeshTCPServer() {
        return this.eventMeshTCPServer;
    }

    public EventMeshTcpMonitor(EventMeshTCPServer eventMeshTCPServer, List<MetricsRegistry> list) {
        this.eventMeshTCPServer = eventMeshTCPServer;
        this.metricsRegistries = (List) Preconditions.checkNotNull(list);
    }

    public void init() throws Exception {
        this.metricsRegistries.forEach((v0) -> {
            v0.start();
        });
        log.info("EventMeshTcpMonitor initialized......");
    }

    public void start() throws Exception {
        this.metricsRegistries.forEach(metricsRegistry -> {
            metricsRegistry.register(this.tcpSummaryMetrics);
            log.info("Register tcpMetrics to {}", metricsRegistry.getClass().getName());
        });
        this.monitorTpsTask = this.eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
            int client2eventMeshMsgNum = this.tcpSummaryMetrics.client2eventMeshMsgNum();
            this.tcpSummaryMetrics.resetClient2EventMeshMsgNum();
            this.tcpSummaryMetrics.setClient2eventMeshTPS((1000 * client2eventMeshMsgNum) / period);
            int eventMesh2clientMsgNum = this.tcpSummaryMetrics.eventMesh2clientMsgNum();
            this.tcpSummaryMetrics.resetEventMesh2ClientMsgNum();
            this.tcpSummaryMetrics.setEventMesh2clientTPS((1000 * eventMesh2clientMsgNum) / period);
            int eventMesh2mqMsgNum = this.tcpSummaryMetrics.eventMesh2mqMsgNum();
            this.tcpSummaryMetrics.resetEventMesh2mqMsgNum();
            this.tcpSummaryMetrics.setEventMesh2mqTPS((1000 * eventMesh2mqMsgNum) / period);
            int mq2eventMeshMsgNum = this.tcpSummaryMetrics.mq2eventMeshMsgNum();
            this.tcpSummaryMetrics.resetMq2eventMeshMsgNum();
            this.tcpSummaryMetrics.setMq2eventMeshTPS((1000 * mq2eventMeshMsgNum) / period);
            HashSet hashSet = new HashSet();
            for (Session session : this.eventMeshTCPServer.getClientSessionGroupMapping().getSessionMap().values()) {
                AtomicLong deliveredMsgsCount = session.getPusher().getDeliveredMsgsCount();
                this.tcpLogger.info("session|deliveredFailCount={}|deliveredMsgsCount={}|unAckMsgsCount={}|sendTopics={}|subscribeTopics={}|user={}", new Object[]{Long.valueOf(session.getPusher().getDeliverFailMsgsCount().longValue()), Long.valueOf(deliveredMsgsCount.longValue()), Integer.valueOf(session.getPusher().getTotalUnackMsgs()), Integer.valueOf(session.getSessionContext().getSendTopics().size()), Integer.valueOf(session.getSessionContext().getSubscribeTopics().size()), session.getClient()});
                hashSet.addAll(session.getSessionContext().getSubscribeTopics().keySet());
            }
            this.tcpSummaryMetrics.setSubTopicNum(hashSet.size());
            this.tcpSummaryMetrics.setAllConnections(this.eventMeshTCPServer.getTcpConnectionHandler().getConnectionCount());
            printAppLogger(this.tcpSummaryMetrics);
        }, period, 60000L, TimeUnit.MILLISECONDS);
        this.monitorThreadPoolTask = this.eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
            this.appLogger.info("{TaskHandle:{},Send:{},Ack:{},Reply:{},Push:{},Scheduler:{},Rebalance:{}}", new Object[]{Integer.valueOf(this.eventMeshTCPServer.getTcpThreadPoolGroup().getTaskHandleExecutorService().getQueue().size()), Integer.valueOf(this.eventMeshTCPServer.getTcpThreadPoolGroup().getSendExecutorService().getQueue().size()), Integer.valueOf(this.eventMeshTCPServer.getTcpThreadPoolGroup().getAckExecutorService().getQueue().size()), Integer.valueOf(this.eventMeshTCPServer.getTcpThreadPoolGroup().getReplyExecutorService().getQueue().size()), Integer.valueOf(this.eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService().getQueue().size()), Integer.valueOf(((ThreadPoolExecutor) this.eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler()).getQueue().size()), Integer.valueOf(this.eventMeshTCPServer.getEventMeshRebalanceService().getRebalanceThreadPoolQueueSize())});
            this.eventMeshTCPServer.getTcpRetryer().printState();
            this.tcpSummaryMetrics.setRetrySize(this.eventMeshTCPServer.getTcpRetryer().getPendingTimeouts());
            this.appLogger.info(MonitorMetricConstants.EVENTMESH_MONITOR_FORMAT_COMMON, new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.RETRY_QUEUE_SIZE, Long.valueOf(this.tcpSummaryMetrics.getRetrySize())});
        }, 10L, PRINT_THREADPOOLSTATE_INTERVAL, TimeUnit.SECONDS);
        log.info("EventMeshTcpMonitor started......");
    }

    private void printAppLogger(TcpSummaryMetrics tcpSummaryMetrics) {
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.CLIENT_2_EVENTMESH_TPS, Integer.valueOf(tcpSummaryMetrics.getClient2eventMeshTPS())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.EVENTMESH_2_MQ_TPS, Integer.valueOf(tcpSummaryMetrics.getEventMesh2mqTPS())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.MQ_2_EVENTMESH_TPS, Integer.valueOf(tcpSummaryMetrics.getMq2eventMeshTPS())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.EVENTMESH_2_CLIENT_TPS, Integer.valueOf(tcpSummaryMetrics.getEventMesh2clientTPS())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.ALL_TPS, Integer.valueOf(tcpSummaryMetrics.getAllTPS())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.CONNECTION, Integer.valueOf(tcpSummaryMetrics.getAllConnections())});
        this.appLogger.info("protocol: {}, s: {}, t: {}", new Object[]{EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.SUB_TOPIC_NUM, Integer.valueOf(tcpSummaryMetrics.getSubTopicNum())});
    }

    public TcpSummaryMetrics getTcpSummaryMetrics() {
        return this.tcpSummaryMetrics;
    }

    public void shutdown() throws Exception {
        this.monitorTpsTask.cancel(true);
        this.monitorThreadPoolTask.cancel(true);
        this.metricsRegistries.forEach((v0) -> {
            v0.showdown();
        });
        log.info("EventMeshTcpMonitor shutdown......");
    }
}
