package org.apache.linkis.manager.am.service.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.common.exception.LinkisRetryException;
import org.apache.linkis.common.utils.ByteTimeUtils;
import org.apache.linkis.engineplugin.server.service.EngineConnResourceFactoryService;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.utils.JobUtils;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.conf.EngineConnConfigurationService;
import org.apache.linkis.manager.am.exception.AMErrorException;
import org.apache.linkis.manager.am.label.EngineReuseLabelChooser;
import org.apache.linkis.manager.am.manager.EngineNodeManager;
import org.apache.linkis.manager.am.selector.ECAvailableRule;
import org.apache.linkis.manager.am.selector.NodeSelector;
import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.node.EMNode;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.common.entity.node.Node;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
import org.apache.linkis.manager.common.utils.ManagerUtils;
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequestImpl;
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnCreationDescImpl;
import org.apache.linkis.manager.engineplugin.common.resource.TimeoutEngineResourceRequest;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
import org.apache.linkis.manager.label.entity.EngineNodeLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel;
import org.apache.linkis.manager.label.service.NodeLabelService;
import org.apache.linkis.manager.label.service.UserLabelService;
import org.apache.linkis.manager.label.utils.LabelUtils;
import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
import org.apache.linkis.manager.rm.AvailableResource;
import org.apache.linkis.manager.rm.NotEnoughResource;
import org.apache.linkis.manager.rm.service.ResourceManager;
import org.apache.linkis.manager.service.common.label.LabelChecker;
import org.apache.linkis.manager.service.common.label.LabelFilter;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.apache.linkis.server.BDPJettyServerHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.class */
public class DefaultEngineCreateService extends AbstractEngineService implements EngineCreateService {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEngineCreateService.class);

    @Autowired
    private NodeSelector nodeSelector;

    @Autowired
    private NodeLabelService nodeLabelService;

    @Autowired
    private ResourceManager resourceManager;

    @Autowired
    private List<LabelChecker> labelCheckerList;

    @Autowired
    private LabelFilter labelFilter;

    @Autowired
    private UserLabelService userLabelService;

    @Autowired
    private EngineConnConfigurationService engineConnConfigurationService;

    @Autowired
    private EngineConnResourceFactoryService engineConnResourceFactoryService;

    @Autowired
    private NodeMetricManagerPersistence nodeMetricManagerPersistence;

    @Autowired
    private List<EngineReuseLabelChooser> engineReuseLabelChoosers;

    @Autowired
    private EngineStopService engineStopService;

    @Override // org.apache.linkis.manager.am.service.engine.EngineCreateService
    @Receiver
    public EngineNode createEngine(EngineCreateRequest engineCreateRequest, Sender sender) throws LinkisRetryException {
        long currentTimeMillis = System.currentTimeMillis();
        String jobIdFromStringMap = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties());
        logger.info("Task: {} start to create Engine for request: {}.", jobIdFromStringMap, engineCreateRequest);
        LabelBuilderFactory labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory();
        long j = engineCreateRequest.getTimeout() <= 0 ? ((TimeType) AMConfiguration.ENGINE_START_MAX_TIME.getValue()).toLong() : engineCreateRequest.getTimeout();
        List<Label<?>> distinctLabel = LabelUtils.distinctLabel(labelBuilderFactory.getLabels(engineCreateRequest.getLabels()), this.userLabelService.getUserLabels(engineCreateRequest.getUser()));
        if (this.engineReuseLabelChoosers != null) {
            Iterator<EngineReuseLabelChooser> it = this.engineReuseLabelChoosers.iterator();
            while (it.hasNext()) {
                distinctLabel = it.next().chooseLabels(distinctLabel);
            }
        }
        Iterator<LabelChecker> it2 = this.labelCheckerList.iterator();
        while (it2.hasNext()) {
            if (!it2.next().checkEngineLabel(distinctLabel)) {
                throw new AMErrorException(30002, "Need to specify engineType and userCreator label");
            }
        }
        ArrayList arrayList = new ArrayList(distinctLabel);
        AliasServiceInstanceLabel createLabel = labelBuilderFactory.createLabel(AliasServiceInstanceLabel.class);
        createLabel.setAlias((String) GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue());
        arrayList.add(createLabel);
        Node[] eMNodes = getEMService().getEMNodes((List<Label<?>>) arrayList.stream().filter(label -> {
            return !(label instanceof EngineTypeLabel);
        }).collect(Collectors.toList()));
        Optional<Node> empty = (eMNodes == null || eMNodes.length == 0) ? Optional.empty() : this.nodeSelector.choseNode(eMNodes);
        if (!empty.isPresent()) {
            throw new LinkisRetryException(30002, "The em of labels " + engineCreateRequest.getLabels() + " not found");
        }
        EMNode eMNode = (EMNode) empty.get();
        Pair<String, NodeResource> requestResource = requestResource(engineCreateRequest, this.labelFilter.choseEngineLabel(distinctLabel), eMNode, j);
        String str = (String) requestResource.getKey();
        EngineConnBuildRequestImpl engineConnBuildRequestImpl = new EngineConnBuildRequestImpl(str, this.labelFilter.choseEngineLabel(distinctLabel), (NodeResource) requestResource.getValue(), new EngineConnCreationDescImpl(engineCreateRequest.getCreateService(), engineCreateRequest.getDescription(), engineCreateRequest.getProperties()));
        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setApplicationName((String) GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue());
        serviceInstance.setInstance(str);
        try {
            EngineNode createEngine = getEMService().createEngine(engineConnBuildRequestImpl, eMNode);
            logger.info("Task: {} finished to create engineConn {}. ticketId is {} ", new Object[]{jobIdFromStringMap, createEngine, str});
            createEngine.setTicketId(str);
            EngineNodeManager engineNodeManager = getEngineNodeManager();
            try {
                engineNodeManager.updateEngineNode(serviceInstance, createEngine);
                AliasServiceInstanceLabel createLabel2 = labelBuilderFactory.createLabel(AliasServiceInstanceLabel.class);
                createLabel2.setAlias((String) GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue());
                distinctLabel.add(createLabel2);
                this.nodeLabelService.addLabelsToNode(createEngine.getServiceInstance(), this.labelFilter.choseEngineLabel(LabelUtils.distinctLabel(distinctLabel, fromEMGetEngineLabels(eMNode.getLabels()))));
                if (System.currentTimeMillis() - currentTimeMillis >= j && engineCreateRequest.isIgnoreTimeout()) {
                    logger.info("Return a EngineConn {} for request: {} since the creator set ignoreTimeout=true and maxStartTime is reached.", createEngine, engineCreateRequest);
                    return createEngine;
                }
                long currentTimeMillis2 = j - (System.currentTimeMillis() - currentTimeMillis);
                if (ECAvailableRule.getInstance().isNeedAvailable(distinctLabel)) {
                    ensureECAvailable(createEngine, str, currentTimeMillis2);
                    logger.info("Task: {} finished to create Engine for request: {} and get engineNode {}. time taken {} ms", new Object[]{jobIdFromStringMap, engineCreateRequest, createEngine, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                } else {
                    logger.info("Task: {} finished to create Engine for request: {} and get engineNode {}. And did not judge the availability, time taken {} ms", new Object[]{jobIdFromStringMap, engineCreateRequest, createEngine, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                }
                return createEngine;
            } catch (Throwable th) {
                logger.warn("Failed to update engineNode " + createEngine, th);
                this.engineStopService.asyncStopEngine(new EngineStopRequest(createEngine.getServiceInstance(), ManagerUtils.getAdminUser()));
                EngineNode engineNode = engineNodeManager.getEngineNode(serviceInstance);
                if (null == engineNode) {
                    logger.info(" engineConn does not exist in db: " + serviceInstance);
                } else {
                    engineNode.setLabels(this.nodeLabelService.getNodeLabels(serviceInstance));
                    engineNode.getLabels().addAll(LabelUtils.distinctLabel(this.labelFilter.choseEngineLabel(distinctLabel), eMNode.getLabels()));
                    engineNode.setNodeStatus(NodeStatus.Failed);
                    this.engineStopService.engineConnInfoClear(engineNode);
                }
                throw new LinkisRetryException(30002, "Failed to update engineNode: " + th.getMessage());
            }
        } catch (Throwable th2) {
            logger.info("Failed to create ec(" + str + ") ask ecm " + eMNode.getServiceInstance());
            EngineNode engineNode2 = getEngineNodeManager().getEngineNode(serviceInstance);
            if (null == engineNode2) {
                logger.info(" engineConn does not exist in db: " + serviceInstance);
            } else {
                engineNode2.setLabels(this.nodeLabelService.getNodeLabels(serviceInstance));
                engineNode2.getLabels().addAll(LabelUtils.distinctLabel(this.labelFilter.choseEngineLabel(distinctLabel), eMNode.getLabels()));
                engineNode2.setNodeStatus(NodeStatus.Failed);
                this.engineStopService.engineConnInfoClear(engineNode2);
            }
            throw th2;
        }
    }

    private Pair<String, NodeResource> requestResource(EngineCreateRequest engineCreateRequest, List<Label<?>> list, EMNode eMNode, long j) {
        if (engineCreateRequest.getProperties() == null) {
            engineCreateRequest.setProperties(new HashMap());
        }
        Map<String, String> consoleConfiguration = this.engineConnConfigurationService.getConsoleConfiguration(list);
        Map properties = engineCreateRequest.getProperties();
        if (consoleConfiguration != null && !consoleConfiguration.isEmpty()) {
            for (Map.Entry<String, String> entry : consoleConfiguration.entrySet()) {
                if (!properties.containsKey(entry.getKey())) {
                    properties.put(entry.getKey(), entry.getValue());
                }
            }
        }
        NodeResource createEngineResource = this.engineConnResourceFactoryService.createEngineResource(new TimeoutEngineResourceRequest(j, engineCreateRequest.getUser(), list, engineCreateRequest.getProperties()));
        AvailableResource requestResource = this.resourceManager.requestResource(LabelUtils.distinctLabel(list, eMNode.getLabels()), createEngineResource, j);
        if (requestResource instanceof AvailableResource) {
            return Pair.of(requestResource.getTicketId(), createEngineResource);
        }
        NotEnoughResource notEnoughResource = (NotEnoughResource) requestResource;
        logger.warn("not engough resource: " + notEnoughResource.getReason());
        throw new LinkisRetryException(30002, "not engough resource: : " + notEnoughResource.getReason());
    }

    private List<Label<?>> fromEMGetEngineLabels(List<Label<?>> list) {
        return (List) list.stream().filter(label -> {
            return (label instanceof EngineNodeLabel) && !(label instanceof EngineTypeLabel);
        }).collect(Collectors.toList());
    }

    private boolean ensuresIdle(EngineNode engineNode, String str) {
        EngineNode engineNodeInfoByDB = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
        if (null == engineNodeInfoByDB) {
            return false;
        }
        if (!NodeStatus.isCompleted(engineNodeInfoByDB.getNodeStatus()).booleanValue()) {
            return NodeStatus.isAvailable(engineNodeInfoByDB.getNodeStatus()).booleanValue();
        }
        Pair<String, Optional<Boolean>> startErrorInfo = getStartErrorInfo(this.nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfoByDB).getHeartBeatMsg());
        if (((Optional) startErrorInfo.getRight()).isPresent()) {
            throw new LinkisRetryException(30001, String.format("%s  ticketID:%s  Failed to initialize engine, reason: %s ", engineNode.getServiceInstance(), str, startErrorInfo.getKey()));
        }
        throw new AMErrorException(30002, String.format("%s ticketID: %s Failed to initialize engine, reason: %s", engineNode.getServiceInstance(), str, startErrorInfo.getKey()));
    }

    private Pair<String, Optional<Boolean>> getStartErrorInfo(String str) {
        if (StringUtils.isNotBlank(str)) {
            JsonNode jsonNode = null;
            try {
                jsonNode = BDPJettyServerHelper.jacksonJson().readTree(str);
            } catch (JsonProcessingException e) {
                logger.warn("getStartErrorInfo readTree failed msg: {}", str, e);
            }
            if (jsonNode != null && jsonNode.has("start_reason")) {
                String asText = jsonNode.get("start_reason").asText();
                return jsonNode.has("ec_can_try") ? Pair.of(asText, Optional.of(true)) : Pair.of(asText, Optional.empty());
            }
        }
        return Pair.of((Object) null, Optional.empty());
    }

    public EngineNode ensureECAvailable(EngineNode engineNode, String str, long j) {
        try {
            logger.info(String.format("Start to wait engineConn(%s) to be available, but only %s left.", engineNode, ByteTimeUtils.msDurationToString(j)));
            LinkisUtils.waitUntil(() -> {
                return Boolean.valueOf(ensuresIdle(engineNode, str));
            }, Duration.ofMillis(j));
            return engineNode;
        } catch (TimeoutException e) {
            logger.info(String.format("Waiting for engineNode:%s(%s) initialization TimeoutException , now stop it.", engineNode, str));
            this.engineStopService.asyncStopEngine(new EngineStopRequest(engineNode.getServiceInstance(), ManagerUtils.getAdminUser()));
            throw new LinkisRetryException(30001, String.format("Waiting for engineNode:%s(%s) initialization TimeoutException, already waiting %d ms", engineNode, str, Long.valueOf(j)));
        } catch (Throwable th) {
            logger.info(String.format("Waiting for %s(%s) initialization failure , now stop it.", engineNode, str));
            this.engineStopService.asyncStopEngine(new EngineStopRequest(engineNode.getServiceInstance(), ManagerUtils.getAdminUser()));
            if (th instanceof LinkisRetryException) {
                throw th;
            }
            throw new AMErrorException(30001, String.format("Waiting for %s(%s) initialization failure", engineNode, str), th);
        }
    }
}
