/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.job.impl.threadpool;

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultScheduler
implements Scheduler<AbstractExecutable>,
ConnectionStateListener {
    private ExecutableManager executableManager;
    private FetcherRunner fetcher;
    private ScheduledExecutorService fetcherPool;
    private ExecutorService jobPool;
    private DefaultContext context;
    private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
    private volatile boolean initialized = false;
    private volatile boolean hasStarted = false;
    private JobEngineConfig jobEngineConfig;
    private static final DefaultScheduler INSTANCE = new DefaultScheduler();

    private DefaultScheduler() {
    }

    public static DefaultScheduler getInstance() {
        return INSTANCE;
    }

    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
            try {
                this.shutdown();
            }
            catch (SchedulerException e) {
                throw new RuntimeException("failed to shutdown scheduler", e);
            }
        }
    }

    @Override
    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.jobEngineConfig = jobEngineConfig;
        if (!jobLock.lock()) {
            throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
        }
        this.executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
        this.fetcherPool = Executors.newScheduledThreadPool(1);
        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
        this.context = new DefaultContext(Maps.newConcurrentMap(), jobEngineConfig.getConfig());
        for (AbstractExecutable executable : this.executableManager.getAllExecutables()) {
            if (executable.getStatus() != ExecutableState.READY) continue;
            this.executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
        }
        this.executableManager.updateAllRunningJobsToError();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                DefaultScheduler.this.logger.debug("Closing zk connection");
                try {
                    DefaultScheduler.this.shutdown();
                }
                catch (SchedulerException e) {
                    DefaultScheduler.this.logger.error("error shutdown scheduler", (Throwable)e);
                }
                finally {
                    jobLock.unlock();
                }
            }
        });
        this.fetcher = new FetcherRunner();
        this.fetcherPool.scheduleAtFixedRate(this.fetcher, 10L, 60L, TimeUnit.SECONDS);
        this.hasStarted = true;
    }

    @Override
    public void shutdown() throws SchedulerException {
        this.fetcherPool.shutdown();
        this.jobPool.shutdown();
    }

    @Override
    public boolean stop(AbstractExecutable executable) throws SchedulerException {
        if (this.hasStarted) {
            return true;
        }
        return true;
    }

    public boolean hasStarted() {
        return this.hasStarted;
    }

    private class JobRunner
    implements Runnable {
        private final AbstractExecutable executable;

        public JobRunner(AbstractExecutable executable) {
            this.executable = executable;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                this.executable.execute(DefaultScheduler.this.context);
                DefaultScheduler.this.fetcherPool.schedule(DefaultScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
            }
            catch (ExecuteException e) {
                DefaultScheduler.this.logger.error("ExecuteException job:" + this.executable.getId(), (Throwable)e);
            }
            catch (Exception e) {
                DefaultScheduler.this.logger.error("unknown error execute job:" + this.executable.getId(), (Throwable)e);
            }
            finally {
                DefaultScheduler.this.context.removeRunningJob(this.executable);
            }
        }
    }

    private class FetcherRunner
    implements Runnable {
        private FetcherRunner() {
        }

        @Override
        public synchronized void run() {
            try {
                Map<String, Executable> runningJobs = DefaultScheduler.this.context.getRunningJobs();
                if (runningJobs.size() >= DefaultScheduler.this.jobEngineConfig.getMaxConcurrentJobLimit()) {
                    DefaultScheduler.this.logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
                    return;
                }
                int nRunning = 0;
                int nReady = 0;
                int nOthers = 0;
                for (String id : DefaultScheduler.this.executableManager.getAllJobIds()) {
                    if (runningJobs.containsKey(id)) {
                        ++nRunning;
                        continue;
                    }
                    Output output = DefaultScheduler.this.executableManager.getOutput(id);
                    if (output.getState() != ExecutableState.READY) {
                        ++nOthers;
                        continue;
                    }
                    ++nReady;
                    AbstractExecutable executable = DefaultScheduler.this.executableManager.getJob(id);
                    String jobDesc = executable.toString();
                    DefaultScheduler.this.logger.info(jobDesc + " prepare to schedule");
                    try {
                        DefaultScheduler.this.context.addRunningJob(executable);
                        DefaultScheduler.this.jobPool.execute(new JobRunner(executable));
                        DefaultScheduler.this.logger.info(jobDesc + " scheduled");
                    }
                    catch (Exception ex) {
                        DefaultScheduler.this.context.removeRunningJob(executable);
                        DefaultScheduler.this.logger.warn(jobDesc + " fail to schedule", (Throwable)ex);
                    }
                }
                DefaultScheduler.this.logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others");
            }
            catch (Exception e) {
                DefaultScheduler.this.logger.warn("Job Fetcher caught a exception " + e);
            }
        }
    }
}

