package org.apache.kylin.job.impl.threadpool;

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.rest.constant.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-core-job-1.5.4.jar:org/apache/kylin/job/impl/threadpool/DefaultScheduler.class */
public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
    private JobLock jobLock;
    private ExecutableManager executableManager;
    private FetcherRunner fetcher;
    private ScheduledExecutorService fetcherPool;
    private ExecutorService jobPool;
    private DefaultContext context;
    private volatile boolean initialized = false;
    private volatile boolean hasStarted = false;
    private JobEngineConfig jobEngineConfig;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DefaultScheduler.class);
    private static DefaultScheduler INSTANCE = null;

    /* loaded from: input_file:WEB-INF/lib/kylin-core-job-1.5.4.jar:org/apache/kylin/job/impl/threadpool/DefaultScheduler$FetcherRunner.class */
    private class FetcherRunner implements Runnable {
        private FetcherRunner() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            try {
                Map<String, Executable> runningJobs = DefaultScheduler.this.context.getRunningJobs();
                if (runningJobs.size() >= DefaultScheduler.this.jobEngineConfig.getMaxConcurrentJobLimit()) {
                    DefaultScheduler.logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
                    return;
                }
                int i = 0;
                int i2 = 0;
                int i3 = 0;
                int i4 = 0;
                int i5 = 0;
                int i6 = 0;
                for (String str : DefaultScheduler.this.executableManager.getAllJobIds()) {
                    if (runningJobs.containsKey(str)) {
                        i++;
                    } else {
                        Output output = DefaultScheduler.this.executableManager.getOutput(str);
                        if (output.getState() == ExecutableState.READY) {
                            i2++;
                            AbstractExecutable job = DefaultScheduler.this.executableManager.getJob(str);
                            String abstractExecutable = job.toString();
                            DefaultScheduler.logger.info(abstractExecutable + " prepare to schedule");
                            try {
                                DefaultScheduler.this.context.addRunningJob(job);
                                DefaultScheduler.this.jobPool.execute(new JobRunner(job));
                                DefaultScheduler.logger.info(abstractExecutable + " scheduled");
                            } catch (Exception e) {
                                DefaultScheduler.this.context.removeRunningJob(job);
                                DefaultScheduler.logger.warn(abstractExecutable + " fail to schedule", (Throwable) e);
                            }
                        } else if (output.getState() == ExecutableState.DISCARDED) {
                            i5++;
                        } else if (output.getState() == ExecutableState.ERROR) {
                            i4++;
                        } else if (output.getState() == ExecutableState.SUCCEED) {
                            i6++;
                        } else {
                            i3++;
                        }
                    }
                }
                DefaultScheduler.logger.info("Job Fetcher: " + i + " should running, " + runningJobs.size() + " actual running, " + i2 + " ready, " + i6 + " already succeed, " + i4 + " error, " + i5 + " discarded, " + i3 + " others");
            } catch (Exception e2) {
                DefaultScheduler.logger.warn("Job Fetcher caught a exception " + e2);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-core-job-1.5.4.jar:org/apache/kylin/job/impl/threadpool/DefaultScheduler$JobRunner.class */
    private class JobRunner implements Runnable {
        private final AbstractExecutable executable;

        public JobRunner(AbstractExecutable abstractExecutable) {
            this.executable = abstractExecutable;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        this.executable.execute(DefaultScheduler.this.context);
                        DefaultScheduler.this.fetcherPool.schedule(DefaultScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                        DefaultScheduler.this.context.removeRunningJob(this.executable);
                    } catch (Exception e) {
                        DefaultScheduler.logger.error("unknown error execute job:" + this.executable.getId(), (Throwable) e);
                        DefaultScheduler.this.context.removeRunningJob(this.executable);
                    }
                } catch (ExecuteException e2) {
                    DefaultScheduler.logger.error("ExecuteException job:" + this.executable.getId(), (Throwable) e2);
                    DefaultScheduler.this.context.removeRunningJob(this.executable);
                }
            } catch (Throwable th) {
                DefaultScheduler.this.context.removeRunningJob(this.executable);
                throw th;
            }
        }
    }

    public DefaultScheduler() {
        if (INSTANCE != null) {
            throw new IllegalStateException("DefaultScheduler has been initiated.");
        }
    }

    public static DefaultScheduler getInstance() {
        return INSTANCE;
    }

    @Override // org.apache.curator.framework.state.ConnectionStateListener
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (connectionState == ConnectionState.SUSPENDED || connectionState == ConnectionState.LOST) {
            try {
                shutdown();
            } catch (SchedulerException e) {
                throw new RuntimeException("failed to shutdown scheduler", e);
            }
        }
    }

    public static synchronized DefaultScheduler createInstance() {
        destroyInstance();
        INSTANCE = new DefaultScheduler();
        return INSTANCE;
    }

    public static synchronized void destroyInstance() {
        DefaultScheduler defaultScheduler = INSTANCE;
        INSTANCE = null;
        if (defaultScheduler != null) {
            try {
                defaultScheduler.shutdown();
            } catch (SchedulerException e) {
                logger.error("error stop DefaultScheduler", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.kylin.job.Scheduler
    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
        this.jobLock = jobLock;
        String serverMode = jobEngineConfig.getConfig().getServerMode();
        if (!Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) && !"all".equals(serverMode.toLowerCase())) {
            logger.info("server mode: " + serverMode + ", no need to run job scheduler");
            return;
        }
        logger.info("Initializing Job Engine ....");
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.jobEngineConfig = jobEngineConfig;
        if (!jobLock.lock()) {
            throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
        }
        this.executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
        this.fetcherPool = Executors.newScheduledThreadPool(1);
        int maxConcurrentJobLimit = jobEngineConfig.getMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(maxConcurrentJobLimit, maxConcurrentJobLimit, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue());
        this.context = new DefaultContext(Maps.newConcurrentMap(), jobEngineConfig.getConfig());
        this.executableManager.resumeAllRunningJobs();
        this.fetcher = new FetcherRunner();
        this.fetcherPool.scheduleAtFixedRate(this.fetcher, 10L, 60L, TimeUnit.SECONDS);
        this.hasStarted = true;
    }

    @Override // org.apache.kylin.job.Scheduler
    public void shutdown() throws SchedulerException {
        logger.info("Shutingdown Job Engine ....");
        this.jobLock.unlock();
        this.fetcherPool.shutdown();
        this.jobPool.shutdown();
    }

    @Override // org.apache.kylin.job.Scheduler
    public boolean stop(AbstractExecutable abstractExecutable) throws SchedulerException {
        return this.hasStarted ? true : true;
    }

    @Override // org.apache.kylin.job.Scheduler
    public boolean hasStarted() {
        return this.hasStarted;
    }
}
