package org.apache.kylin.job.impl.threadpool;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.common.util.SystemInfoCollector;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.base.Strings;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.runners.FetcherRunner;
import org.apache.kylin.job.runners.JobCheckRunner;
import org.apache.kylin.job.runners.QuotaStorageCheckRunner;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/threadpool/NDefaultScheduler.class */
public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
    private String project;
    private ScheduledExecutorService fetcherPool;
    private ExecutorService jobPool;
    private ExecutableContext context;
    private JobEngineConfig jobEngineConfig;
    private static final Logger logger = LoggerFactory.getLogger(NDefaultScheduler.class);
    private static volatile Semaphore memoryRemaining = new Semaphore(Integer.MAX_VALUE);
    private static final Map<String, NDefaultScheduler> INSTANCE_MAP = Maps.newConcurrentMap();
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private AtomicBoolean hasStarted = new AtomicBoolean(false);
    private AtomicBoolean hasFinishedTransactions = new AtomicBoolean(false);
    private long epochId = -1;

    public NDefaultScheduler() {
    }

    public NDefaultScheduler(String str) {
        Preconditions.checkNotNull(str);
        this.project = str;
        if (INSTANCE_MAP.containsKey(str)) {
            throw new IllegalStateException("DefaultScheduler for project " + str + " has been initiated. Use getInstance() instead.");
        }
        logger.debug("New NDefaultScheduler created by project '{}': {}", str, Integer.valueOf(System.identityHashCode(this)));
    }

    public static synchronized NDefaultScheduler getInstance(String str) {
        return INSTANCE_MAP.computeIfAbsent(str, NDefaultScheduler::new);
    }

    public void fetchJobsImmediately() {
        this.fetcherPool.schedule(new FetcherRunner(this, this.jobPool, this.fetcherPool), 1L, TimeUnit.SECONDS);
    }

    public static List<NDefaultScheduler> listAllSchedulers() {
        return Lists.newArrayList(INSTANCE_MAP.values());
    }

    public static synchronized void destroyInstance() {
        Iterator<Map.Entry<String, NDefaultScheduler>> it = INSTANCE_MAP.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutdown();
        }
        INSTANCE_MAP.clear();
    }

    public static synchronized void shutdownByProject(String str) {
        NDefaultScheduler instanceByProject = getInstanceByProject(str);
        if (instanceByProject != null) {
            INSTANCE_MAP.remove(str);
            instanceByProject.forceShutdown();
        }
    }

    public static synchronized NDefaultScheduler getInstanceByProject(String str) {
        return INSTANCE_MAP.get(str);
    }

    @Override // org.apache.kylin.job.Scheduler
    public synchronized void init(JobEngineConfig jobEngineConfig) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (!instanceFromEnv.isUTEnv()) {
            this.epochId = EpochManager.getInstance().getEpochId(this.project);
        }
        String serverMode = jobEngineConfig.getServerMode();
        if (!instanceFromEnv.isJobNode()) {
            logger.info("server mode: {}, no need to run job scheduler", serverMode);
            return;
        }
        if (!UnitOfWork.isAlreadyInTransaction()) {
            logger.info("Initializing Job Engine ....");
        }
        if (this.initialized.compareAndSet(false, true)) {
            this.jobEngineConfig = jobEngineConfig;
            this.fetcherPool = Executors.newScheduledThreadPool(1, new NamedThreadFactory("FetchJobWorker(project:" + this.project + ")"));
            int maxConcurrentJobLimitByProject = getMaxConcurrentJobLimitByProject(instanceFromEnv, jobEngineConfig, this.project);
            if (instanceFromEnv.getAutoSetConcurrentJob().booleanValue()) {
                double maxLocalConsumptionRatio = instanceFromEnv.getMaxLocalConsumptionRatio();
                synchronized (NDefaultScheduler.class) {
                    if (Integer.MAX_VALUE == memoryRemaining.availablePermits()) {
                        memoryRemaining = new Semaphore((int) (SystemInfoCollector.getAvailableMemoryInfo().intValue() * maxLocalConsumptionRatio));
                    }
                }
                logger.info("Scheduler memory remaining: {}", Integer.valueOf(memoryRemaining.availablePermits()));
            }
            this.jobPool = new ThreadPoolExecutor(maxConcurrentJobLimitByProject, maxConcurrentJobLimitByProject, Long.MAX_VALUE, TimeUnit.DAYS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new NamedThreadFactory("RunJobWorker(project:" + this.project + ")"));
            this.context = new ExecutableContext(Maps.newConcurrentMap(), Maps.newConcurrentMap(), jobEngineConfig.getConfig(), this.epochId);
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).resumeAllRunningJobs();
            int pollIntervalSecond = jobEngineConfig.getPollIntervalSecond();
            logger.info("Fetching jobs every {} seconds", Integer.valueOf(pollIntervalSecond));
            FetcherRunner fetcherRunner = new FetcherRunner(this, this.jobPool, this.fetcherPool);
            if (instanceFromEnv.isStorageQuotaEnabled()) {
                this.fetcherPool.scheduleWithFixedDelay(new QuotaStorageCheckRunner(this), RandomUtils.nextInt(0, pollIntervalSecond), pollIntervalSecond, TimeUnit.SECONDS);
            }
            this.fetcherPool.scheduleWithFixedDelay(new JobCheckRunner(this), RandomUtils.nextInt(0, pollIntervalSecond), pollIntervalSecond, TimeUnit.SECONDS);
            this.fetcherPool.scheduleWithFixedDelay(fetcherRunner, RandomUtils.nextInt(0, pollIntervalSecond), pollIntervalSecond, TimeUnit.SECONDS);
            this.hasStarted.set(true);
        }
    }

    @Override // org.apache.kylin.job.Scheduler
    public void shutdown() {
        if (Thread.currentThread().isInterrupted()) {
            logger.warn("shutdown->current thread is interrupted,{}", Thread.currentThread().getName());
            throw new InterruptedException();
        }
        logger.info("Shutting down DefaultScheduler for project {} ....", this.project);
        releaseResources();
        if (null != this.fetcherPool) {
            ExecutorServiceUtil.shutdownGracefully(this.fetcherPool, 60);
        }
        if (null != this.jobPool) {
            ExecutorServiceUtil.shutdownGracefully(this.jobPool, 60);
        }
    }

    public void forceShutdown() {
        if (Thread.currentThread().isInterrupted()) {
            logger.warn("shutdownNow->current thread is interrupted,{}", Thread.currentThread().getName());
            throw new InterruptedException();
        }
        logger.info("Force to shut down DefaultScheduler for project {} ....", this.project);
        releaseResources();
        ExecutorServiceUtil.forceShutdown(this.fetcherPool);
        ExecutorServiceUtil.forceShutdown(this.jobPool);
    }

    private void releaseResources() {
        this.initialized.set(false);
        this.hasStarted.set(false);
    }

    @Override // org.apache.kylin.job.Scheduler
    public boolean hasStarted() {
        return this.hasStarted.get();
    }

    public boolean hasFinishedTransactions() {
        return this.hasFinishedTransactions.get();
    }

    public static double currentAvailableMem() {
        return 1.0d * memoryRemaining.availablePermits();
    }

    public int getMaxConcurrentJobLimitByProject(KylinConfig kylinConfig, JobEngineConfig jobEngineConfig, String str) {
        ProjectInstance project = NProjectManager.getInstance(kylinConfig).getProject(str);
        return (Strings.isNullOrEmpty(str) || project == null) ? jobEngineConfig.getMaxConcurrentJobLimit() : project.getConfig().getMaxConcurrentJobLimit();
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public ExecutableContext getContext() {
        return this.context;
    }

    @Generated
    public void setHasFinishedTransactions(AtomicBoolean atomicBoolean) {
        this.hasFinishedTransactions = atomicBoolean;
    }

    @Generated
    public JobEngineConfig getJobEngineConfig() {
        return this.jobEngineConfig;
    }

    @Generated
    public static Semaphore getMemoryRemaining() {
        return memoryRemaining;
    }
}
