package org.apache.linkis.manager.am.service.engine;

import feign.RetryableException;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.common.exception.LinkisRetryException;
import org.apache.linkis.governance.common.utils.JobUtils;
import org.apache.linkis.governance.common.utils.LoggerUtils;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.am.utils.AMUtils;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.common.protocol.engine.EngineAskAsyncResponse;
import org.apache.linkis.manager.common.protocol.engine.EngineAskRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineCreateError;
import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineCreateSuccess;
import org.apache.linkis.manager.common.protocol.engine.EngineReuseRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.class */
public class DefaultEngineAskEngineService extends AbstractEngineService implements EngineAskEngineService {
    private EngineCreateService engineCreateService;
    private EngineReuseService engineReuseService;
    private static final Logger logger = LoggerFactory.getLogger(DefaultEngineAskEngineService.class);
    private static final ThreadPoolExecutor EXECUTOR = LinkisUtils.newCachedThreadPool(Integer.valueOf(AMConfiguration.ASK_ENGINE_ASYNC_MAX_THREAD_SIZE), "AskEngineService-Thread-", true);

    @Autowired
    public DefaultEngineAskEngineService(EngineCreateService engineCreateService, EngineReuseService engineReuseService) {
        this.engineCreateService = engineCreateService;
        this.engineReuseService = engineReuseService;
    }

    @Override // org.apache.linkis.manager.am.service.engine.EngineAskEngineService
    @Receiver
    public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) {
        String jobIdFromStringMap = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
        LoggerUtils.setJobIdMDC(jobIdFromStringMap);
        logger.info(String.format("received task: %s, engineAskRequest %s", jobIdFromStringMap, engineAskRequest.toString()));
        if (!engineAskRequest.getLabels().containsKey("executeOnce")) {
            EngineReuseRequest engineReuseRequest = new EngineReuseRequest();
            engineReuseRequest.setLabels(engineAskRequest.getLabels());
            engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut());
            engineReuseRequest.setUser(engineAskRequest.getUser());
            engineReuseRequest.setProperties(engineAskRequest.getProperties());
            EngineNode engineNode = null;
            try {
                engineNode = this.engineReuseService.reuseEngine(engineReuseRequest, sender);
            } catch (Exception e) {
                if (e instanceof LinkisRetryException) {
                    logger.info(String.format("task: %s user %s reuse engine failed %s", jobIdFromStringMap, engineAskRequest.getUser(), e.getMessage()));
                } else {
                    logger.info(String.format("task: %s user %s reuse engine failed", jobIdFromStringMap, engineAskRequest.getUser()), e);
                }
            }
            if (engineNode != null) {
                logger.info(String.format("Finished to ask engine for task: %s user %s by reuse node %s", jobIdFromStringMap, engineAskRequest.getUser(), engineNode));
                return engineNode;
            }
        }
        String asyncId = AMUtils.getAsyncId();
        CompletableFuture.supplyAsync(() -> {
            try {
                LoggerUtils.setJobIdMDC(jobIdFromStringMap);
                logger.info(String.format("Task: %s start to async(%s) createEngine, %s", jobIdFromStringMap, asyncId, engineAskRequest.getCreateService()));
                engineAskRequest.getLabels().remove("engineInstance");
                EngineCreateRequest engineCreateRequest = new EngineCreateRequest();
                engineCreateRequest.setLabels(engineAskRequest.getLabels());
                engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
                engineCreateRequest.setUser(engineAskRequest.getUser());
                engineCreateRequest.setProperties(engineAskRequest.getProperties());
                engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
                EngineNode createEngine = this.engineCreateService.createEngine(engineCreateRequest, sender);
                EngineNode useEngine = getEngineNodeManager().useEngine(createEngine, engineCreateRequest.getTimeout() <= 0 ? ((TimeType) AMConfiguration.ENGINE_START_MAX_TIME.getValue()).toLong() : engineCreateRequest.getTimeout());
                if (useEngine == null) {
                    throw new LinkisRetryException(30002, String.format("create engine%s success, but to use engine failed", createEngine.getServiceInstance()));
                }
                logger.info(String.format("Task: %s finished to ask engine for user %s by create node %s", jobIdFromStringMap, engineAskRequest.getUser(), useEngine));
                LoggerUtils.removeJobIdMDC();
                return useEngine;
            } catch (Throwable th) {
                LoggerUtils.removeJobIdMDC();
                throw th;
            }
        }, EXECUTOR).whenComplete((engineNode2, th) -> {
            boolean z;
            LoggerUtils.setJobIdMDC(jobIdFromStringMap);
            if (((th instanceof CompletionException) || (th instanceof ExecutionException)) && th.getCause() != null) {
                th = th.getCause();
            }
            if (th != null) {
                if (th instanceof LinkisRetryException) {
                    z = true;
                } else if (th instanceof RetryableException) {
                    z = true;
                } else {
                    Throwable rootCause = ExceptionUtils.getRootCause(th);
                    z = rootCause instanceof SocketTimeoutException ? true : rootCause instanceof TimeoutException;
                }
                String format = String.format("Task: %s Failed to async(%s) createEngine, can Retry %s", jobIdFromStringMap, asyncId, Boolean.valueOf(z));
                if (z) {
                    logger.info(String.format("msg: %s canRetry Exception: %s", format, th.getClass().getName()));
                } else {
                    logger.info(format, th);
                }
                sender.send(new EngineCreateError(asyncId, ExceptionUtils.getRootCauseMessage(th), Boolean.valueOf(z)));
            } else {
                logger.info(String.format("Task: %s Success to async(%s) createEngine %s", jobIdFromStringMap, asyncId, engineNode2));
                sender.send(new EngineCreateSuccess(asyncId, engineNode2));
            }
            LoggerUtils.removeJobIdMDC();
        });
        LoggerUtils.removeJobIdMDC();
        return new EngineAskAsyncResponse(asyncId, Sender.getThisServiceInstance());
    }
}
