package org.apache.linkis.manager.am.service.monitor;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.am.conf.ManagerMonitorConf;
import org.apache.linkis.manager.am.service.em.EMUnregisterService;
import org.apache.linkis.manager.am.service.engine.EngineStopService;
import org.apache.linkis.manager.am.service.heartbeat.AMHeartbeatService;
import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.common.entity.enumeration.NodeHealthy;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.metrics.NodeHealthyInfo;
import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
import org.apache.linkis.manager.common.entity.node.Node;
import org.apache.linkis.manager.common.monitor.ManagerMonitor;
import org.apache.linkis.manager.common.protocol.em.StopEMRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatMsg;
import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatRequest;
import org.apache.linkis.manager.common.utils.ManagerUtils;
import org.apache.linkis.manager.persistence.NodeManagerPersistence;
import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
import org.apache.linkis.manager.service.common.label.ManagerLabelService;
import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
import org.apache.linkis.rpc.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.class */
public class NodeHeartbeatMonitor implements ManagerMonitor {
    private static final Logger logger = LoggerFactory.getLogger(NodeHeartbeatMonitor.class);

    @Autowired
    private NodeManagerPersistence nodeManagerPersistence;

    @Autowired
    private NodeMetricManagerPersistence nodeMetricManagerPersistence;

    @Autowired
    private MetricsConverter metricsConverter;

    @Autowired
    private AMHeartbeatService amHeartbeatService;

    @Autowired
    private EngineStopService engineStopService;

    @Autowired
    private EMUnregisterService emUnregisterService;

    @Autowired
    private ManagerLabelService managerLabelService;
    private final ExecutorService fixedThreadPoll = LinkisUtils.newFixedThreadPool(((Integer) ManagerMonitorConf.MANAGER_MONITOR_ASYNC_POLL_SIZE.getValue()).intValue(), "manager_async", false);
    private final String ecName = (String) GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue();
    private final String ecmName = (String) GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue();
    private final long maxCreateInterval = ((TimeType) ManagerMonitorConf.NODE_MAX_CREATE_TIME.getValue()).toLong();
    private final long maxUpdateInterval = ((TimeType) ManagerMonitorConf.NODE_HEARTBEAT_MAX_UPDATE_TIME.getValue()).toLong();
    private final long ecmHeartBeatTime = ((TimeType) ManagerMonitorConf.ECM_HEARTBEAT_MAX_UPDATE_TIME.getValue()).toLong();

    public void run() {
        LinkisUtils.tryAndWarn(() -> {
            logger.info("Start to check the health of the node");
            List<Node> allNodes = this.nodeManagerPersistence.getAllNodes();
            List nodeMetrics = this.nodeMetricManagerPersistence.getNodeMetrics(allNodes);
            if (null != nodeMetrics) {
                for (Node node : allNodes) {
                    Iterator it = nodeMetrics.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            NodeMetrics nodeMetrics2 = (NodeMetrics) it.next();
                            if (nodeMetrics2.getServiceInstance().toString().equals(node.getServiceInstance().toString())) {
                                node.setNodeStatus(NodeStatus.values()[nodeMetrics2.getStatus().intValue()]);
                                node.setUpdateTime(nodeMetrics2.getUpdateTime());
                                break;
                            }
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList();
            for (Node node2 : allNodes) {
                if (node2.getServiceInstance().getApplicationName().equalsIgnoreCase(this.ecName)) {
                    arrayList.add(node2);
                }
            }
            LinkisUtils.tryAndWarn(() -> {
                dealECNodes(arrayList);
            }, logger);
            ArrayList arrayList2 = new ArrayList();
            for (Node node3 : allNodes) {
                if (node3.getServiceInstance().getApplicationName().equalsIgnoreCase(this.ecmName)) {
                    arrayList2.add(node3);
                }
            }
            dealECMNotExistsInRegistry(arrayList2);
            logger.info("Finished to check the health of the node");
        }, logger);
    }

    private void dealECNodes(List<Node> list) {
        List list2 = (List) Arrays.stream(Sender.getInstances(this.ecName)).collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        list.forEach(node -> {
            if (NodeStatus.isCompleted(node.getNodeStatus()).booleanValue()) {
                logger.info("{} is completed {}, will be remove", node.getServiceInstance(), node.getNodeStatus());
                hashSet.add(node.getServiceInstance());
                return;
            }
            boolean z = System.currentTimeMillis() - node.getStartTime().getTime() > this.maxCreateInterval;
            boolean z2 = System.currentTimeMillis() - (node.getUpdateTime() == null ? node.getStartTime().getTime() : node.getUpdateTime().getTime()) > this.maxUpdateInterval;
            if (node.getNodeStatus() != null) {
                if (z2) {
                    logger.warn("{} heartbeat updateOverdue", node.getServiceInstance());
                    hashSet.add(node.getServiceInstance());
                    return;
                }
                return;
            }
            if (list2.contains(node.getServiceInstance()) || !z) {
                return;
            }
            logger.warn("Failed to find instance {} from Service Registry prepare to kill, engineIsStarted", node.getServiceInstance());
            hashSet.add(node.getServiceInstance());
        });
        hashSet.forEach(this::clearEngineNode);
    }

    private void updateMetrics(Node node) {
        NodeMetrics nodeMetrics = this.nodeMetricManagerPersistence.getNodeMetrics(node);
        if (nodeMetrics != null) {
            node.setNodeStatus(NodeStatus.values()[nodeMetrics.getStatus().intValue()]);
            node.setUpdateTime(nodeMetrics.getUpdateTime());
        }
    }

    private void dealECMNotExistsInRegistry(List<Node> list) {
        List list2 = (List) Arrays.stream(Sender.getInstances(this.ecName)).collect(Collectors.toList());
        list.forEach(node -> {
            boolean z = System.currentTimeMillis() - (node.getUpdateTime() == null ? node.getStartTime().getTime() : node.getUpdateTime().getTime()) > this.ecmHeartBeatTime;
            if (list2.contains(node.getServiceInstance()) || !z) {
                return;
            }
            LinkisUtils.tryAndWarn(() -> {
                updateMetrics(node);
            }, logger);
            boolean z2 = node.getUpdateTime() == null ? System.currentTimeMillis() - node.getStartTime().getTime() > this.ecmHeartBeatTime : System.currentTimeMillis() - node.getUpdateTime().getTime() > this.ecmHeartBeatTime;
            if (list2.contains(node.getServiceInstance()) || !z2) {
                return;
            }
            logger.warn("Failed to find ecm instance {} from Service Registry to kill", node.getServiceInstance());
            triggerEMSuicide(node.getServiceInstance());
        });
    }

    private void dealHealthyList(List<NodeMetrics> list) {
        if (list != null) {
            list.forEach(nodeMetrics -> {
                Sender sender = Sender.getSender(nodeMetrics.getServiceInstance());
                if (sender == null) {
                    updateMetricHealthy(nodeMetrics, NodeHealthy.UnHealthy, "sender is null");
                }
                LinkisUtils.tryCatch(() -> {
                    Object ask = sender.ask(new NodeHeartbeatRequest());
                    if (!(ask instanceof NodeHeartbeatMsg)) {
                        updateMetricHealthy(nodeMetrics, NodeHealthy.UnHealthy, "sender is null");
                        return "";
                    }
                    NodeHeartbeatMsg nodeHeartbeatMsg = (NodeHeartbeatMsg) ask;
                    if (NodeHealthy.isAvailable(nodeHeartbeatMsg.getHealthyInfo().getNodeHealthy()).booleanValue() || !this.managerLabelService.isEngine(nodeMetrics.getServiceInstance())) {
                        this.amHeartbeatService.heartbeatEventDeal(nodeHeartbeatMsg);
                        return "";
                    }
                    updateMetricHealthy(nodeMetrics, NodeHealthy.UnHealthy, "ec is Unhealthy");
                    return "";
                }, th -> {
                    if (th instanceof UndeclaredThrowableException) {
                        dealMetricUpdateTimeOut(nodeMetrics, (UndeclaredThrowableException) th);
                        return "";
                    }
                    logger.warn("heartbeat RPC request failed, but it is not caused by timeout, the engine will not be forcibly stopped, engine instance: " + nodeMetrics.getServiceInstance(), th);
                    return "";
                });
            });
        }
    }

    private void clearEngineNode(ServiceInstance serviceInstance) {
        logger.warn("Manager Monitor prepare to kill engine " + serviceInstance);
        try {
            this.engineStopService.stopEngine(new EngineStopRequest(serviceInstance, ManagerUtils.getAdminUser()), Sender.getSender(Sender.getThisServiceInstance()));
        } catch (Exception e) {
            logger.error("Em failed to kill engine " + serviceInstance, e);
            LinkisUtils.tryAndWarn(() -> {
                triggerEngineSuicide(serviceInstance);
            }, logger);
        }
    }

    private void triggerEMToStopEngine(ServiceInstance serviceInstance) {
        logger.warn("Manager Monitor prepare to kill engine " + serviceInstance + " by em");
        this.engineStopService.stopEngine(new EngineStopRequest(serviceInstance, ManagerUtils.getAdminUser()), Sender.getSender(Sender.getThisServiceInstance()));
    }

    private void triggerEngineSuicide(ServiceInstance serviceInstance) {
        logger.warn("Manager Monitor prepare to triggerEngineSuicide engine " + serviceInstance);
    }

    private void triggerEMSuicide(ServiceInstance serviceInstance) {
        logger.warn("Manager Monitor prepare to kill EM " + serviceInstance);
        StopEMRequest stopEMRequest = new StopEMRequest();
        stopEMRequest.setEm(serviceInstance);
        stopEMRequest.setUser(ManagerUtils.getAdminUser());
        this.emUnregisterService.stopEM(stopEMRequest, Sender.getSender(Sender.getThisServiceInstance()));
    }

    private void updateMetricHealthy(NodeMetrics nodeMetrics, NodeHealthy nodeHealthy, String str) {
        logger.warn("update instance " + nodeMetrics.getServiceInstance() + " from " + nodeMetrics.getHealthy() + " to " + nodeHealthy);
        NodeHealthyInfo nodeHealthyInfo = new NodeHealthyInfo();
        nodeHealthyInfo.setMsg("Manager-Monitor considers the node to be in UnHealthy state, reason: " + str);
        nodeHealthyInfo.setNodeHealthy(nodeHealthy);
        nodeMetrics.setHealthy(this.metricsConverter.convertHealthyInfo(nodeHealthyInfo));
        this.nodeMetricManagerPersistence.addOrupdateNodeMetrics(nodeMetrics);
    }

    private void dealMetricUpdateTimeOut(NodeMetrics nodeMetrics, UndeclaredThrowableException undeclaredThrowableException) {
        if (System.currentTimeMillis() - nodeMetrics.getUpdateTime().getTime() > ((TimeType) ManagerMonitorConf.NODE_HEARTBEAT_MAX_UPDATE_TIME.getValue()).toLong()) {
            logger.warn("The engine failed to send the RPC request, and the engine instance could not be found: " + nodeMetrics.getServiceInstance() + ", start sending the request to stop the engine!", undeclaredThrowableException);
            triggerEMToStopEngine(nodeMetrics.getServiceInstance());
        }
    }
}
