package org.apache.kylin.job.impl.threadpool;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.tool.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DefaultScheduler.class */
public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
    private JobLock jobLock;
    private ExecutableManager executableManager;
    private Runnable fetcher;
    private ScheduledExecutorService fetcherPool;
    private ExecutorService jobPool;
    private DefaultContext context;
    private volatile boolean initialized = false;
    private volatile boolean hasStarted = false;
    volatile boolean fetchFailed = false;
    private JobEngineConfig jobEngineConfig;
    private static DefaultScheduler INSTANCE = null;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DefaultScheduler.class);

    /* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DefaultScheduler$FetcherRunner.class */
    private class FetcherRunner implements Runnable {
        private FetcherRunner() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:100:0x0285, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:102:0x0275, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:104:0x027c, code lost:
        
            r11 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:105:0x027d, code lost:
        
            r0.addSuppressed(r11);
         */
        /* JADX WARN: Code restructure failed: missing block: B:82:0x0098, code lost:
        
            if (r0 == null) goto L124;
         */
        /* JADX WARN: Code restructure failed: missing block: B:84:0x009c, code lost:
        
            if (0 == 0) goto L26;
         */
        /* JADX WARN: Code restructure failed: missing block: B:85:0x00b1, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:86:0x00b5, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:88:0x009f, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:90:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:91:0x00a6, code lost:
        
            r21 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:92:0x00a8, code lost:
        
            r0.addSuppressed(r21);
         */
        /* JADX WARN: Code restructure failed: missing block: B:93:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:94:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:96:0x01fa, code lost:
        
            r8.this$0.fetchFailed = false;
            org.apache.kylin.job.impl.threadpool.DefaultScheduler.logger.info("Job Fetcher: " + r12 + " should running, " + r0.size() + " actual running, " + r14 + " stopped, " + r13 + " ready, " + r18 + " already succeed, " + r16 + " error, " + r17 + " discarded, " + r15 + " others");
         */
        /* JADX WARN: Code restructure failed: missing block: B:97:0x026e, code lost:
        
            if (r0 == null) goto L79;
         */
        /* JADX WARN: Code restructure failed: missing block: B:99:0x0272, code lost:
        
            if (0 == 0) goto L64;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public synchronized void run() {
            /*
                Method dump skipped, instructions count: 716
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kylin.job.impl.threadpool.DefaultScheduler.FetcherRunner.run():void");
        }
    }

    /* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DefaultScheduler$FetcherRunnerWithPriority.class */
    private class FetcherRunnerWithPriority implements Runnable {
        volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue;

        private FetcherRunnerWithPriority() {
            this.jobPriorityQueue = new PriorityQueue<>(1, new Comparator<Pair<AbstractExecutable, Integer>>() { // from class: org.apache.kylin.job.impl.threadpool.DefaultScheduler.FetcherRunnerWithPriority.1
                @Override // java.util.Comparator
                public int compare(Pair<AbstractExecutable, Integer> pair, Pair<AbstractExecutable, Integer> pair2) {
                    return pair.getSecond().intValue() > pair2.getSecond().intValue() ? -1 : 1;
                }
            });
        }

        private void addToJobPool(AbstractExecutable abstractExecutable, int i) {
            String abstractExecutable2 = abstractExecutable.toString();
            DefaultScheduler.logger.info(abstractExecutable2 + " prepare to schedule and its priority is " + i);
            try {
                DefaultScheduler.this.context.addRunningJob(abstractExecutable);
                DefaultScheduler.this.jobPool.execute(new JobRunner(abstractExecutable));
                DefaultScheduler.logger.info(abstractExecutable2 + " scheduled");
            } catch (Exception e) {
                DefaultScheduler.this.context.removeRunningJob(abstractExecutable);
                DefaultScheduler.logger.warn(abstractExecutable2 + " fail to schedule", (Throwable) e);
            }
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            Pair<AbstractExecutable, Integer> poll;
            try {
                Map<String, Executable> runningJobs = DefaultScheduler.this.context.getRunningJobs();
                HashMap newHashMap = Maps.newHashMap();
                while (true) {
                    Pair<AbstractExecutable, Integer> peek = this.jobPriorityQueue.peek();
                    if (peek == null || peek.getSecond().intValue() < DefaultScheduler.this.jobEngineConfig.getFetchQueuePriorityBar()) {
                        break;
                    }
                    Pair<AbstractExecutable, Integer> poll2 = this.jobPriorityQueue.poll();
                    AbstractExecutable first = poll2.getFirst();
                    int intValue = poll2.getSecond().intValue();
                    if (intValue > first.getDefaultPriority() + 1) {
                        addToJobPool(first, intValue);
                    } else {
                        newHashMap.put(first.getId(), Integer.valueOf(intValue + 1));
                    }
                }
                if (runningJobs.size() >= DefaultScheduler.this.jobEngineConfig.getMaxConcurrentJobLimit()) {
                    DefaultScheduler.logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
                    return;
                }
                while (true) {
                    Pair<AbstractExecutable, Integer> poll3 = this.jobPriorityQueue.poll();
                    if (poll3 == null) {
                        break;
                    } else {
                        newHashMap.put(poll3.getFirst().getId(), Integer.valueOf(poll3.getSecond().intValue() + 1));
                    }
                }
                int i = 0;
                int i2 = 0;
                int i3 = 0;
                int i4 = 0;
                int i5 = 0;
                int i6 = 0;
                int i7 = 0;
                for (String str : DefaultScheduler.this.executableManager.getAllJobIds()) {
                    if (runningJobs.containsKey(str)) {
                        i++;
                    } else {
                        AbstractExecutable job = DefaultScheduler.this.executableManager.getJob(str);
                        if (job.isReady()) {
                            i2++;
                            Integer num = (Integer) newHashMap.get(str);
                            if (num == null) {
                                num = Integer.valueOf(job.getDefaultPriority());
                            }
                            this.jobPriorityQueue.add(new Pair<>(job, num));
                        } else {
                            Output output = DefaultScheduler.this.executableManager.getOutput(str);
                            if (output.getState() == ExecutableState.DISCARDED) {
                                i6++;
                            } else if (output.getState() == ExecutableState.ERROR) {
                                i5++;
                            } else if (output.getState() == ExecutableState.SUCCEED) {
                                i7++;
                            } else if (output.getState() == ExecutableState.STOPPED) {
                                i3++;
                            } else {
                                i4++;
                            }
                        }
                    }
                }
                while (runningJobs.size() < DefaultScheduler.this.jobEngineConfig.getMaxConcurrentJobLimit() && (poll = this.jobPriorityQueue.poll()) != null) {
                    addToJobPool(poll.getFirst(), poll.getSecond().intValue());
                }
                DefaultScheduler.logger.info("Job Fetcher: " + i + " running, " + runningJobs.size() + " actual running, " + i3 + " stopped, " + i2 + " ready, " + this.jobPriorityQueue.size() + " waiting, " + i7 + " already succeed, " + i5 + " error, " + i6 + " discarded, " + i4 + " others");
            } catch (Exception e) {
                DefaultScheduler.logger.warn("Job Fetcher caught a exception " + e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DefaultScheduler$JobRunner.class */
    public class JobRunner implements Runnable {
        private final AbstractExecutable executable;

        public JobRunner(AbstractExecutable abstractExecutable) {
            this.executable = abstractExecutable;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    SetThreadName setThreadName = new SetThreadName("Scheduler %s Job %s", Integer.valueOf(System.identityHashCode(DefaultScheduler.this)), this.executable.getId());
                    Throwable th = null;
                    try {
                        this.executable.execute(DefaultScheduler.this.context);
                        if (setThreadName != null) {
                            if (0 != 0) {
                                try {
                                    setThreadName.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                setThreadName.close();
                            }
                        }
                        DefaultScheduler.this.context.removeRunningJob(this.executable);
                    } catch (Throwable th3) {
                        if (setThreadName != null) {
                            if (0 != 0) {
                                try {
                                    setThreadName.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                setThreadName.close();
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    DefaultScheduler.this.context.removeRunningJob(this.executable);
                    throw th5;
                }
            } catch (ExecuteException e) {
                DefaultScheduler.logger.error("ExecuteException job:" + this.executable.getId(), (Throwable) e);
                DefaultScheduler.this.context.removeRunningJob(this.executable);
            } catch (Exception e2) {
                DefaultScheduler.logger.error("unknown error execute job:" + this.executable.getId(), (Throwable) e2);
                DefaultScheduler.this.context.removeRunningJob(this.executable);
            }
            DefaultScheduler.this.fetcherPool.schedule(DefaultScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
        }
    }

    public static DefaultScheduler getInstance() {
        if (INSTANCE == null) {
            INSTANCE = createInstance();
        }
        return INSTANCE;
    }

    public static synchronized DefaultScheduler createInstance() {
        destroyInstance();
        INSTANCE = new DefaultScheduler();
        return INSTANCE;
    }

    public static synchronized void destroyInstance() {
        DefaultScheduler defaultScheduler = INSTANCE;
        INSTANCE = null;
        if (defaultScheduler != null) {
            try {
                defaultScheduler.shutdown();
            } catch (SchedulerException e) {
                logger.error("error stop DefaultScheduler", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
    }

    public DefaultScheduler() {
        if (INSTANCE != null) {
            throw new IllegalStateException("DefaultScheduler has been initiated.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isJobPoolFull() {
        if (this.context.getRunningJobs().size() < this.jobEngineConfig.getMaxConcurrentJobLimit()) {
            return false;
        }
        logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
        return true;
    }

    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (connectionState == ConnectionState.SUSPENDED || connectionState == ConnectionState.LOST) {
            try {
                shutdown();
            } catch (SchedulerException e) {
                throw new RuntimeException("failed to shutdown scheduler", e);
            }
        }
    }

    @Override // org.apache.kylin.job.Scheduler
    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
        this.jobLock = jobLock;
        String serverMode = jobEngineConfig.getConfig().getServerMode();
        if (!Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) && !"all".equals(serverMode.toLowerCase())) {
            logger.info("server mode: " + serverMode + ", no need to run job scheduler");
            return;
        }
        logger.info("Initializing Job Engine ....");
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.jobEngineConfig = jobEngineConfig;
        if (!this.jobLock.lockJobEngine()) {
            throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
        }
        this.executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
        this.fetcherPool = Executors.newScheduledThreadPool(1);
        int maxConcurrentJobLimit = jobEngineConfig.getMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(maxConcurrentJobLimit, maxConcurrentJobLimit, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue());
        this.context = new DefaultContext(Maps.newConcurrentMap(), jobEngineConfig.getConfig());
        this.executableManager.resumeAllRunningJobs();
        int pollIntervalSecond = jobEngineConfig.getPollIntervalSecond();
        logger.info("Fetching jobs every {} seconds", Integer.valueOf(pollIntervalSecond));
        this.fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
        this.fetcherPool.scheduleAtFixedRate(this.fetcher, pollIntervalSecond / 10, pollIntervalSecond, TimeUnit.SECONDS);
        this.hasStarted = true;
    }

    @Override // org.apache.kylin.job.Scheduler
    public void shutdown() throws SchedulerException {
        logger.info("Shutting down DefaultScheduler ....");
        this.jobLock.unlockJobEngine();
        this.initialized = false;
        this.hasStarted = false;
        try {
            this.fetcherPool.shutdownNow();
            this.fetcherPool.awaitTermination(1L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            logger.warn("InterruptedException is caught!");
        }
        try {
            this.jobPool.shutdownNow();
            this.jobPool.awaitTermination(1L, TimeUnit.MINUTES);
        } catch (InterruptedException e2) {
            logger.warn("InterruptedException is caught!");
        }
    }

    @Override // org.apache.kylin.job.Scheduler
    public boolean hasStarted() {
        return this.hasStarted;
    }
}
