package org.apache.linkis.cli.core.interactor.execution;

import org.apache.linkis.cli.common.entity.execution.Execution;
import org.apache.linkis.cli.common.entity.execution.ExecutionResult;
import org.apache.linkis.cli.common.entity.execution.executor.Executor;
import org.apache.linkis.cli.common.entity.execution.jobexec.ExecutionStatus;
import org.apache.linkis.cli.common.entity.execution.jobexec.JobStatus;
import org.apache.linkis.cli.common.entity.job.Job;
import org.apache.linkis.cli.common.exception.LinkisClientRuntimeException;
import org.apache.linkis.cli.common.exception.error.ErrorLevel;
import org.apache.linkis.cli.core.constants.Constants;
import org.apache.linkis.cli.core.exception.ExecutorException;
import org.apache.linkis.cli.core.exception.error.CommonErrMsg;
import org.apache.linkis.cli.core.interactor.execution.executor.AsyncBackendExecutor;
import org.apache.linkis.cli.core.interactor.execution.executor.LogRetrievable;
import org.apache.linkis.cli.core.interactor.execution.executor.SyncBackendExecutor;
import org.apache.linkis.cli.core.interactor.execution.jobexec.JobSubmitExec;
import org.apache.linkis.cli.core.interactor.execution.observer.event.IncLogEvent;
import org.apache.linkis.cli.core.interactor.execution.observer.event.LinkisClientEvent;
import org.apache.linkis.cli.core.interactor.execution.observer.listener.LinkisClientObserver;
import org.apache.linkis.cli.core.interactor.execution.observer.listener.TriggerObserver;
import org.apache.linkis.cli.core.utils.CommonUtils;
import org.apache.linkis.cli.core.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/cli/core/interactor/execution/SyncSubmission.class */
public class SyncSubmission implements Execution {
    private static final Logger logger = LoggerFactory.getLogger(SyncSubmission.class);
    ExecutionStatus executionStatus = ExecutionStatus.UNDEFINED;
    private LinkisClientEvent incLogEvent = new IncLogEvent();
    private TriggerObserver incLogFinObserver = new TriggerObserver();

    public void checkInit() {
        if (this.incLogEvent == null || this.incLogFinObserver == null) {
            throw new ExecutorException(JobStatus.UNSUBMITTED, "EXE0001", ErrorLevel.ERROR, CommonErrMsg.ExecutionInitErr, "Executor is not properly inited:");
        }
    }

    public SyncSubmission registerIncLogEventListener(LinkisClientObserver linkisClientObserver) {
        this.incLogEvent.register(linkisClientObserver);
        return this;
    }

    public SyncSubmission getIncLogFinObserverRegistered(LinkisClientEvent linkisClientEvent) {
        linkisClientEvent.register(this.incLogFinObserver);
        return this;
    }

    public ExecutionResult execute(Executor executor, Job job) {
        JobSubmitExec ExecWithAsyncBackend;
        if (executor instanceof SyncBackendExecutor) {
            ExecWithAsyncBackend = ((SyncBackendExecutor) executor).submitAndGetResult(job);
        } else {
            if (!(executor instanceof AsyncBackendExecutor)) {
                throw new ExecutorException(JobStatus.UNSUBMITTED, "EXE0004", ErrorLevel.ERROR, CommonErrMsg.ExecutionErr, "Executor Type: \"" + getClass().getCanonicalName() + "\" is not Supported");
            }
            ExecWithAsyncBackend = ExecWithAsyncBackend((AsyncBackendExecutor) executor, job);
        }
        return new ExecutionResultImpl(ExecWithAsyncBackend, ExecWithAsyncBackend.isJobSuccess() ? ExecutionStatus.SUCCEED : !ExecWithAsyncBackend.isJobCompleted() ? ExecutionStatus.UNDEFINED : ExecutionStatus.FAILED, null);
    }

    private JobSubmitExec ExecWithAsyncBackend(AsyncBackendExecutor asyncBackendExecutor, Job job) {
        JobSubmitExec jobSubmitExec = null;
        JobSubmitExec submit = asyncBackendExecutor.submit(job);
        CommonUtils.doSleepQuietly(Constants.JOB_QUERY_SLEEP_MILLS);
        JobSubmitExec checkSubmit = asyncBackendExecutor.checkSubmit(submit);
        checkJobAvailability(checkSubmit);
        if (!checkSubmit.isJobSubmitted()) {
            throw new ExecutorException(JobStatus.UNSUBMITTED, "EXE0005", ErrorLevel.ERROR, CommonErrMsg.ExecutionErr, "Retry exhausted checking job submission. Job is probably not submitted");
        }
        StringBuilder sb = new StringBuilder();
        sb.append("Job is successfully submitted!").append(System.lineSeparator());
        LogUtils.getInformationLogger().info(sb.toString());
        if (asyncBackendExecutor instanceof LogRetrievable) {
            this.incLogEvent.notifyObserver(this.incLogEvent, checkSubmit);
        }
        int i = 0;
        while (!checkSubmit.isJobCompleted()) {
            try {
                checkSubmit = asyncBackendExecutor.updateJobStatus(checkSubmit);
                i++;
                checkJobAvailability(checkSubmit);
                CommonUtils.doSleepQuietly(Constants.JOB_QUERY_SLEEP_MILLS);
            } catch (Exception e) {
                logger.warn("", e);
                i++;
                if (i >= 30) {
                    throw new ExecutorException("EXE0013", ErrorLevel.ERROR, CommonErrMsg.ExecutionErr, "Cannot get jobStatus from server continuously for {0} seconds. Client aborted! Error message: \n", Long.valueOf(150 * Constants.JOB_QUERY_SLEEP_MILLS.longValue()), e);
                }
                CommonUtils.doSleepQuietly(Long.valueOf(5 * Constants.JOB_QUERY_SLEEP_MILLS.longValue()));
            }
        }
        if (checkSubmit.isJobSuccess()) {
            this.executionStatus = ExecutionStatus.SUCCEED;
        } else {
            this.executionStatus = ExecutionStatus.FAILED;
        }
        try {
            jobSubmitExec = asyncBackendExecutor.doGetFinalResult(checkSubmit);
        } catch (Exception e2) {
            logger.warn("Exception thrown when trying to query final result. Status will change to FAILED", e2);
            try {
                jobSubmitExec = asyncBackendExecutor.updateJobStatus(checkSubmit);
            } catch (Exception e3) {
                logger.warn("", e2);
                jobSubmitExec.setJobStatus(JobStatus.UNKNOWN);
            }
        }
        waitIncLogComplete();
        return jobSubmitExec;
    }

    private void checkJobAvailability(JobSubmitExec jobSubmitExec) throws LinkisClientRuntimeException {
        if (jobSubmitExec.isJobAbnormalStatus()) {
            throw new ExecutorException(JobStatus.UNSUBMITTED, "EXE0006", ErrorLevel.ERROR, CommonErrMsg.ExecutionErr, "Job is in abnormal status: " + CommonUtils.GSON.toJson(jobSubmitExec));
        }
    }

    private void waitIncLogComplete() {
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 300) {
                String str = "Job is completed but client keep querying inclog for " + ((300 * Constants.JOB_QUERY_SLEEP_MILLS.longValue()) / 1000) + "seconds. Execution ends forcefully. Next will try handle execution result.";
                logger.warn(str);
                LogUtils.getInformationLogger().warn(str);
                return;
            } else if (this.incLogFinObserver.isTriggered().booleanValue()) {
                return;
            } else {
                CommonUtils.doSleepQuietly(Constants.JOB_QUERY_SLEEP_MILLS);
            }
        }
    }

    public boolean terminate(Executor executor, Job job) {
        return executor.terminate(job);
    }
}
