package org.apache.zeppelin.scheduler;

import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.scheduler.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler.class */
public class RemoteScheduler extends AbstractScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteScheduler.class);
    private RemoteInterpreter remoteInterpreter;
    private ExecutorService executor;

    /* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler$JobRunner.class */
    private class JobRunner implements Runnable, JobListener {
        private RemoteScheduler scheduler;
        private Job job;
        private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
        private volatile boolean jobExecuted = false;
        private volatile boolean jobSubmittedRemotely = false;

        public JobRunner(RemoteScheduler remoteScheduler, Job job) {
            this.scheduler = remoteScheduler;
            this.job = job;
        }

        public boolean isJobSubmittedInRemote() {
            return this.jobSubmittedRemotely;
        }

        public boolean isJobExecuted() {
            return this.jobExecuted;
        }

        @Override // java.lang.Runnable
        public void run() {
            JobStatusPoller jobStatusPoller = new JobStatusPoller(this.job, this, 100L);
            jobStatusPoller.start();
            this.scheduler.runJob(this.job);
            this.jobExecuted = true;
            this.jobSubmittedRemotely = true;
            jobStatusPoller.shutdown();
            try {
                jobStatusPoller.join();
            } catch (InterruptedException e) {
                this.logger.error("JobStatusPoller interrupted", e);
            }
        }

        public void onProgressUpdate(Job job, int i) {
        }

        public void onStatusChange(Job job, Job.Status status, Job.Status status2) {
            if (this.jobExecuted) {
                this.jobSubmittedRemotely = true;
            } else {
                if (status2 == Job.Status.FINISHED || status2 == Job.Status.ABORT || status2 == Job.Status.ERROR) {
                    return;
                }
                if (status2 == Job.Status.RUNNING) {
                    this.jobSubmittedRemotely = true;
                    job.setStatus(Job.Status.RUNNING);
                }
            }
            synchronized (job) {
                if (status2 == Job.Status.RUNNING && job.getStatus() == Job.Status.PENDING) {
                    job.setStatus(Job.Status.RUNNING);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler$JobStatusPoller.class */
    private class JobStatusPoller extends Thread {
        private long checkIntervalMsec;
        private volatile boolean terminate;
        private JobListener listener;
        private Job job;
        private volatile Job.Status lastStatus;

        public JobStatusPoller(Job job, JobListener jobListener, long j) {
            setName("JobStatusPoller-" + job.getId());
            this.checkIntervalMsec = j;
            this.job = job;
            this.listener = jobListener;
            this.terminate = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Job.Status status;
            while (!this.terminate) {
                synchronized (this) {
                    try {
                        wait(this.checkIntervalMsec);
                    } catch (InterruptedException e) {
                        RemoteScheduler.LOGGER.error("Exception in RemoteScheduler while run this.wait", e);
                    }
                }
                if (this.terminate || (status = getStatus()) == Job.Status.RUNNING || status == Job.Status.FINISHED || status == Job.Status.ERROR || status == Job.Status.ABORT) {
                    break;
                }
            }
            this.terminate = true;
        }

        public void shutdown() {
            this.terminate = true;
            synchronized (this) {
                notify();
            }
        }

        public Job.Status getStatus() {
            if (!RemoteScheduler.this.remoteInterpreter.isOpened()) {
                return this.lastStatus != null ? this.lastStatus : this.job.getStatus();
            }
            Job.Status valueOf = Job.Status.valueOf(RemoteScheduler.this.remoteInterpreter.getStatus(this.job.getId()));
            if (valueOf == Job.Status.UNKNOWN) {
                return this.job.getStatus();
            }
            this.listener.onStatusChange(this.job, this.lastStatus, valueOf);
            this.lastStatus = valueOf;
            return valueOf;
        }
    }

    public RemoteScheduler(String str, ExecutorService executorService, RemoteInterpreter remoteInterpreter) {
        super(str);
        this.executor = executorService;
        this.remoteInterpreter = remoteInterpreter;
    }

    public void runJobInScheduler(Job job) {
        JobRunner jobRunner = new JobRunner(this, job);
        this.executor.execute(jobRunner);
        String property = this.remoteInterpreter.getProperty(".execution.mode", "paragraph");
        if (property.equals("paragraph")) {
            while (!jobRunner.isJobSubmittedInRemote()) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote queue.wait", e);
                }
            }
            return;
        }
        if (!property.equals("note")) {
            throw new RuntimeException("Invalid job execution.mode: " + property + ", only 'note' and 'paragraph' are valid");
        }
        while (!jobRunner.isJobExecuted()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e2) {
                LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted queue.wait", e2);
            }
        }
    }
}
