package org.apache.kylin.job.impl.threadpool;

import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DistributedScheduler.class */
public class DistributedScheduler implements Scheduler<AbstractExecutable> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DistributedScheduler.class);
    public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock";
    private ExecutableManager executableManager;
    private FetcherRunner fetcher;
    private ScheduledExecutorService fetcherPool;
    private ExecutorService watchPool;
    private ExecutorService jobPool;
    private DefaultContext context;
    private DistributedLock jobLock;
    private Closeable lockWatch;
    private final Set<String> jobWithLocks = new CopyOnWriteArraySet();
    private volatile boolean initialized = false;
    private volatile boolean hasStarted = false;
    private JobEngineConfig jobEngineConfig;
    private String serverName;

    /* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DistributedScheduler$JobRunner.class */
    private class JobRunner implements Runnable {
        private final AbstractExecutable executable;

        public JobRunner(AbstractExecutable abstractExecutable) {
            this.executable = abstractExecutable;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    SetThreadName setThreadName = new SetThreadName("Scheduler %s Job %s", Integer.valueOf(System.identityHashCode(DistributedScheduler.this)), this.executable.getId());
                    Throwable th = null;
                    try {
                        if (DistributedScheduler.this.jobLock.lock(DistributedScheduler.getLockPath(this.executable.getId()))) {
                            DistributedScheduler.logger.info(this.executable.toString() + " scheduled in server: " + DistributedScheduler.this.serverName);
                            DistributedScheduler.this.context.addRunningJob(this.executable);
                            DistributedScheduler.this.jobWithLocks.add(this.executable.getId());
                            this.executable.execute(DistributedScheduler.this.context);
                        }
                        if (setThreadName != null) {
                            if (0 != 0) {
                                try {
                                    setThreadName.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                setThreadName.close();
                            }
                        }
                        DistributedScheduler.this.context.removeRunningJob(this.executable);
                        releaseJobLock(this.executable);
                        DistributedScheduler.this.fetcherPool.schedule(DistributedScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                    } catch (Throwable th3) {
                        if (setThreadName != null) {
                            if (0 != 0) {
                                try {
                                    setThreadName.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                setThreadName.close();
                            }
                        }
                        throw th3;
                    }
                } catch (ExecuteException e) {
                    DistributedScheduler.logger.error("ExecuteException job:" + this.executable.getId() + " in server: " + DistributedScheduler.this.serverName, (Throwable) e);
                    DistributedScheduler.this.context.removeRunningJob(this.executable);
                    releaseJobLock(this.executable);
                    DistributedScheduler.this.fetcherPool.schedule(DistributedScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                } catch (Exception e2) {
                    DistributedScheduler.logger.error("unknown error execute job:" + this.executable.getId() + " in server: " + DistributedScheduler.this.serverName, (Throwable) e2);
                    DistributedScheduler.this.context.removeRunningJob(this.executable);
                    releaseJobLock(this.executable);
                    DistributedScheduler.this.fetcherPool.schedule(DistributedScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                }
            } catch (Throwable th5) {
                DistributedScheduler.this.context.removeRunningJob(this.executable);
                releaseJobLock(this.executable);
                DistributedScheduler.this.fetcherPool.schedule(DistributedScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                throw th5;
            }
        }

        private void releaseJobLock(AbstractExecutable abstractExecutable) {
            ExecutableState status;
            if (!(abstractExecutable instanceof DefaultChainedExecutable) || (status = abstractExecutable.getStatus()) == ExecutableState.READY || status == ExecutableState.RUNNING || !DistributedScheduler.this.jobWithLocks.contains(abstractExecutable.getId())) {
                return;
            }
            DistributedScheduler.logger.info(abstractExecutable.toString() + " will release the lock for the job: " + abstractExecutable.getId());
            DistributedScheduler.this.jobLock.unlock(DistributedScheduler.getLockPath(abstractExecutable.getId()));
            DistributedScheduler.this.jobWithLocks.remove(abstractExecutable.getId());
        }
    }

    /* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DistributedScheduler$WatcherProcessImpl.class */
    private class WatcherProcessImpl implements DistributedLock.Watcher {
        private String serverName;

        public WatcherProcessImpl(String str) {
            this.serverName = str;
        }

        @Override // org.apache.kylin.common.lock.DistributedLock.Watcher
        public void onUnlock(String str, String str2) {
            String[] split = StringUtil.split(str, "/");
            String str3 = split[split.length - 1];
            try {
                DistributedScheduler.this.executableManager.syncDigestsOfJob(str3);
            } catch (PersistentException e) {
                DistributedScheduler.logger.error("Failed to sync cache of job: " + str3 + ", at server: " + this.serverName);
            }
            if (DistributedScheduler.this.executableManager.getOutput(str3).getState() == ExecutableState.RUNNING) {
                AbstractExecutable job = DistributedScheduler.this.executableManager.getJob(str3);
                if (!(job instanceof DefaultChainedExecutable) || str2.equalsIgnoreCase(this.serverName)) {
                    return;
                }
                try {
                    DistributedScheduler.logger.warn(str2 + " has released the lock for: " + str3 + " but the job still running. so " + this.serverName + " resume the job");
                    if (!DistributedScheduler.this.jobLock.isLocked(DistributedScheduler.getLockPath(str3))) {
                        DistributedScheduler.this.executableManager.resumeRunningJobForce(job.getId());
                        DistributedScheduler.this.fetcherPool.schedule(DistributedScheduler.this.fetcher, 0L, TimeUnit.SECONDS);
                    }
                } catch (Exception e2) {
                    DistributedScheduler.logger.error("resume the job but fail in server: " + this.serverName, (Throwable) e2);
                }
            }
        }

        @Override // org.apache.kylin.common.lock.DistributedLock.Watcher
        public void onLock(String str, String str2) {
        }
    }

    public static DistributedScheduler getInstance(KylinConfig kylinConfig) {
        return (DistributedScheduler) kylinConfig.getManager(DistributedScheduler.class);
    }

    static DistributedScheduler newInstance(KylinConfig kylinConfig) throws IOException {
        return new DistributedScheduler();
    }

    @Override // org.apache.kylin.job.Scheduler
    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
        String serverMode = jobEngineConfig.getConfig().getServerMode();
        if (!ServerMode.SERVER_MODE_JOB.equals(serverMode.toLowerCase(Locale.ROOT)) && !"all".equals(serverMode.toLowerCase(Locale.ROOT))) {
            logger.info("server mode: " + serverMode + ", no need to run job scheduler");
            return;
        }
        logger.info("Initializing Job Engine ....");
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.jobEngineConfig = jobEngineConfig;
        this.jobLock = (DistributedLock) jobLock;
        this.serverName = this.jobLock.getClient();
        this.executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
        this.fetcherPool = Executors.newScheduledThreadPool(1);
        this.watchPool = Executors.newFixedThreadPool(1);
        this.lockWatch = this.jobLock.watchLocks(getWatchPath(), this.watchPool, new WatcherProcessImpl(this.serverName));
        int maxConcurrentJobLimit = jobEngineConfig.getMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(maxConcurrentJobLimit, maxConcurrentJobLimit, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue());
        this.context = new DefaultContext(Maps.newConcurrentMap(), jobEngineConfig.getConfig());
        int pollIntervalSecond = jobEngineConfig.getPollIntervalSecond();
        logger.info("Fetching jobs every {} seconds", Integer.valueOf(pollIntervalSecond));
        JobExecutor jobExecutor = new JobExecutor() { // from class: org.apache.kylin.job.impl.threadpool.DistributedScheduler.1
            @Override // org.apache.kylin.job.impl.threadpool.JobExecutor
            public void execute(AbstractExecutable abstractExecutable) {
                DistributedScheduler.this.jobPool.execute(new JobRunner(abstractExecutable));
            }
        };
        this.fetcher = jobEngineConfig.getJobPriorityConsidered() ? new PriorityFetcherRunner(jobEngineConfig, this.context, jobExecutor) : new DefaultFetcherRunner(jobEngineConfig, this.context, jobExecutor);
        this.fetcherPool.scheduleAtFixedRate(this.fetcher, pollIntervalSecond / 10, pollIntervalSecond, TimeUnit.SECONDS);
        this.hasStarted = true;
        resumeAllRunningJobs();
    }

    private void resumeAllRunningJobs() {
        for (String str : this.executableManager.getAllJobIds()) {
            Output output = this.executableManager.getOutput(str);
            AbstractExecutable job = this.executableManager.getJob(str);
            if (output.getState() == ExecutableState.RUNNING && (job instanceof DefaultChainedExecutable)) {
                try {
                    if (!this.jobLock.isLocked(getLockPath(job.getId()))) {
                        this.executableManager.resumeRunningJobForce(job.getId());
                        this.fetcherPool.schedule(this.fetcher, 0L, TimeUnit.SECONDS);
                    }
                } catch (Exception e) {
                    logger.error("resume the job " + str + " fail in server: " + this.serverName, (Throwable) e);
                }
            }
        }
    }

    public static String getLockPath(String str) {
        return dropDoubleSlash("/job_engine/lock/" + str);
    }

    private static String getWatchPath() {
        return dropDoubleSlash(ZOOKEEPER_LOCK_PATH);
    }

    public static String dropDoubleSlash(String str) {
        int i = Integer.MAX_VALUE;
        while (i > str.length()) {
            i = str.length();
            str = str.replace("//", "/");
        }
        return str;
    }

    @Override // org.apache.kylin.job.Scheduler
    public void shutdown() throws SchedulerException {
        logger.info("Will shut down Job Engine ....");
        try {
            this.lockWatch.close();
            releaseAllLocks();
            logger.info("The all locks has released");
            this.fetcherPool.shutdown();
            logger.info("The fetcherPool has down");
            this.jobPool.shutdown();
            logger.info("The jobPoll has down");
        } catch (IOException e) {
            throw new SchedulerException(e);
        }
    }

    private void releaseAllLocks() {
        Iterator<String> it = this.jobWithLocks.iterator();
        while (it.hasNext()) {
            this.jobLock.unlock(getLockPath(it.next()));
        }
    }

    @Override // org.apache.kylin.job.Scheduler
    public boolean hasStarted() {
        return this.hasStarted;
    }
}
