package org.apache.linkis.manager.am.service.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.am.utils.AMUtils;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.node.AMEMNode;
import org.apache.linkis.manager.common.entity.node.EMNode;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource;
import org.apache.linkis.manager.common.entity.resource.LoadInstanceResource;
import org.apache.linkis.manager.common.exception.RMWarnException;
import org.apache.linkis.manager.common.protocol.engine.EngineConnReleaseRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
import org.apache.linkis.manager.dao.NodeMetricManagerMapper;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.service.NodeLabelService;
import org.apache.linkis.manager.label.service.impl.DefaultNodeLabelRemoveService;
import org.apache.linkis.manager.rm.exception.RMErrorCode;
import org.apache.linkis.manager.rm.service.impl.DefaultResourceManager;
import org.apache.linkis.protocol.label.NodeLabelRemoveRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.class */
public class DefaultEngineStopService extends AbstractEngineService implements EngineStopService {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEngineStopService.class);

    @Autowired
    private NodeLabelService nodeLabelService;

    @Autowired
    private DefaultResourceManager resourceManager;

    @Autowired
    private DefaultNodeLabelRemoveService nodeLabelRemoveService;

    @Autowired
    private EngineInfoService engineInfoService;

    @Autowired
    private EngineStopService engineStopService;

    @Autowired
    private NodeMetricManagerMapper nodeMetricManagerMapper;
    private ExecutorService EXECUTOR = LinkisUtils.newFixedThreadPool(AMConfiguration.ASYNC_STOP_ENGINE_MAX_THREAD_SIZE, "AsyncStopEngineService-Thread-", true);

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    @Receiver
    public void stopEngine(EngineStopRequest engineStopRequest, Sender sender) {
        engineStopRequest.getServiceInstance().setApplicationName((String) GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue());
        logger.info(String.format("user %s prepare to stop engine %s", engineStopRequest.getUser(), engineStopRequest.getServiceInstance()));
        EngineNode engineNode = getEngineNodeManager().getEngineNode(engineStopRequest.getServiceInstance());
        if (Objects.isNull(engineNode)) {
            logger.info(String.format("engineConn does not exist in db: %s", engineStopRequest));
            return;
        }
        engineNode.setLabels(this.nodeLabelService.getNodeLabels(engineStopRequest.getServiceInstance()));
        logger.info(String.format("Start to kill engine invoke enginePointer %s", engineNode.getServiceInstance()));
        try {
            getEMService().stopEngine(engineNode, engineNode.getEMNode());
            logger.info("Finished to kill engine invoke enginePointer {}", engineNode.getServiceInstance());
        } catch (Exception e) {
            logger.warn(String.format("Failed to stop engine %s", engineNode.getServiceInstance()));
        }
        if (engineNode.getNodeStatus() == null) {
            engineNode.setNodeStatus(NodeStatus.ShuttingDown);
        }
        engineConnInfoClear(engineNode);
        logger.info(String.format("user %s finished to stop engine %s", engineStopRequest.getUser(), engineStopRequest.getServiceInstance()));
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    public Map<String, Object> stopUnlockEngineByECM(String str, String str2) {
        HashMap hashMap = new HashMap();
        int i = 0;
        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setInstance(str);
        serviceInstance.setApplicationName((String) GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue());
        EMNode aMEMNode = new AMEMNode();
        aMEMNode.setServiceInstance(serviceInstance);
        List<EngineNode> listEMEngines = this.engineInfoService.listEMEngines(aMEMNode);
        List<EngineNode> list = (List) listEMEngines.stream().filter(engineNode -> {
            return NodeStatus.Unlock.equals(engineNode.getNodeStatus());
        }).filter(engineNode2 -> {
            return !engineNode2.getLabels().isEmpty();
        }).collect(Collectors.toList());
        logger.info(String.format("get ec node total num:%d and unlock node num:%d of ecm:%s", Integer.valueOf(listEMEngines.size()), Integer.valueOf(list.size()), str));
        LoadInstanceResource loadInstanceResource = new LoadInstanceResource(0L, 0, 0);
        for (EngineNode engineNode3 : list) {
            if (logger.isDebugEnabled()) {
                try {
                    logger.debug("ec node:" + AMUtils.mapper.writeValueAsString(engineNode3));
                } catch (JsonProcessingException e) {
                    logger.debug("convert jobReq to string with error:" + e.getMessage());
                }
            }
            EngineTypeLabel engineTypeLabel = (EngineTypeLabel) engineNode3.getLabels().stream().filter(label -> {
                return label instanceof EngineTypeLabel;
            }).map(label2 -> {
                return (EngineTypeLabel) label2;
            }).findFirst().orElse(null);
            String engineType = engineTypeLabel != null ? engineTypeLabel.getEngineType() : null;
            if (AMConfiguration.isAllowKilledEngineType(engineType)) {
                if (engineNode3.getNodeResource().getUsedResource() != null) {
                    DriverAndYarnResource usedResource = engineNode3.getNodeResource().getUsedResource();
                    if (usedResource instanceof DriverAndYarnResource) {
                        usedResource.getLoadInstanceResource();
                    }
                    loadInstanceResource = loadInstanceResource.add(engineNode3.getNodeResource().getUsedResource());
                }
                logger.info(String.format("try to asyn kill engine node:%s", engineNode3.getServiceInstance().getInstance()));
                i++;
                asyncStopEngineWithUpdateMetrics(new EngineStopRequest(engineNode3.getServiceInstance(), str2));
            } else {
                logger.info(String.format("skipped to kill engine node:%s, engine type:%s", engineNode3.getServiceInstance().getInstance(), engineType));
            }
        }
        hashMap.put("killEngineNum", Integer.valueOf(i));
        hashMap.put("memory", Long.valueOf(loadInstanceResource.getMemory()));
        hashMap.put("cores", Integer.valueOf(loadInstanceResource.getCores()));
        hashMap.put("batchKillEngineType", AMConfiguration.ALLOW_BATCH_KILL_ENGINE_TYPES.getValue());
        return hashMap;
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    public void engineConnInfoClear(EngineNode engineNode) {
        logger.info(String.format("Start to clear ec info %s", engineNode));
        try {
            this.resourceManager.resourceReleased(engineNode);
        } catch (Exception e) {
            if (e instanceof RMWarnException) {
                RMWarnException rMWarnException = e;
                if (rMWarnException.getErrCode() != RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode()) {
                    throw rMWarnException;
                }
            }
        }
        this.nodeLabelRemoveService.removeNodeLabel(new NodeLabelRemoveRequest(engineNode.getServiceInstance(), true));
        getEngineNodeManager().deleteEngineNode(engineNode);
        logger.info(String.format("Finished to clear ec info %s", engineNode));
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    @Receiver
    public void engineSuicide(EngineSuicideRequest engineSuicideRequest, Sender sender) {
        logger.info(String.format("Will ask engine : %s of user : %s to suicide.", engineSuicideRequest.getServiceInstance().toString(), engineSuicideRequest.getUser()));
        EngineStopService.askEngineToSuicide(engineSuicideRequest);
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    @Receiver
    public void dealEngineRelease(EngineConnReleaseRequest engineConnReleaseRequest, Sender sender) {
        logger.info(String.format("Start to kill engine, with msg: %s, %s", engineConnReleaseRequest.getMsg(), engineConnReleaseRequest.getServiceInstance().toString()));
        if (engineConnReleaseRequest.getServiceInstance() == null) {
            logger.warn("Invalid empty serviceInstance, will not kill engine.");
            return;
        }
        EngineNode engineNode = getEngineNodeManager().getEngineNode(engineConnReleaseRequest.getServiceInstance());
        if (engineNode == null) {
            logger.warn(String.format("Cannot find valid engineNode from serviceInstance: %s", engineConnReleaseRequest.getServiceInstance().toString()));
            return;
        }
        logger.info(String.format("Send stop engine request %s", engineConnReleaseRequest.getServiceInstance().toString()));
        engineNode.setLabels(this.nodeLabelService.getNodeLabels(engineNode.getServiceInstance()));
        engineNode.setNodeStatus(engineConnReleaseRequest.getNodeStatus());
        engineConnInfoClear(engineNode);
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    public void asyncStopEngine(EngineStopRequest engineStopRequest) {
        CompletableFuture.runAsync(() -> {
            logger.info(String.format("Start to async stop engineFailed %s", engineStopRequest));
            LinkisUtils.tryAndErrorMsg(() -> {
                stopEngine(engineStopRequest, Sender.getSender(Sender.getThisServiceInstance()));
            }, String.format("async stop engineFailed %s", engineStopRequest), logger);
        }, this.EXECUTOR);
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineStopService
    public void asyncStopEngineWithUpdateMetrics(EngineStopRequest engineStopRequest) {
        CompletableFuture.runAsync(() -> {
            try {
                logger.info(String.format("Start to async stop engine node:%s", engineStopRequest));
                String serviceInstance = engineStopRequest.getServiceInstance().getInstance();
                logger.info(String.format("Try to update ec node:%s status Unlock --> ShuttingDown", engineStopRequest));
                if (this.nodeMetricManagerMapper.updateNodeStatus(serviceInstance, NodeStatus.ShuttingDown.ordinal(), NodeStatus.Unlock.ordinal()) > 0) {
                    logger.info(String.format("Try to do stop ec node %s action", engineStopRequest));
                    stopEngine(engineStopRequest, Sender.getSender(Sender.getThisServiceInstance()));
                } else {
                    logger.info(String.format("ec node:%s status update failed! maybe the status is not unlock. will skip to kill this ec node", serviceInstance));
                }
            } catch (Exception e) {
                logger.error(String.format("asyncStopEngineWithUpdateMetrics with error: %s", e.getMessage()), e);
            }
        }, this.EXECUTOR);
    }
}
