package org.apache.kylin.rest.scheduler;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

@Component
/* loaded from: input_file:org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.class */
public class AutoRefreshSnapshotScheduler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AutoRefreshSnapshotScheduler.class);
    private static final Integer THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE = 20;

    @Autowired
    @Qualifier("projectScheduler")
    private TaskScheduler projectScheduler;

    @Autowired
    @Qualifier("normalRestTemplate")
    private RestTemplate restTemplate;
    private final Map<String, Pair<String, ScheduledFuture<?>>> taskFutures = Maps.newConcurrentMap();
    private final AtomicInteger schedulerProjectCount = new AtomicInteger(0);

    public void startCron(String str, Runnable runnable, String str2) {
        stopCron(str);
        checkSchedulerThreadPoolSize();
        ScheduledFuture schedule = this.projectScheduler.schedule(runnable, triggerContext -> {
            return new CronTrigger(str2).nextExecutionTime(triggerContext);
        });
        log.info("Project[{}] start cron[{}]", str, str2);
        this.taskFutures.put(str, new Pair<>(str2, schedule));
    }

    public void checkSchedulerThreadPoolSize() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = this.projectScheduler;
        int poolSize = threadPoolTaskScheduler.getPoolSize();
        int incrementAndGet = this.schedulerProjectCount.incrementAndGet();
        if (incrementAndGet > poolSize) {
            threadPoolTaskScheduler.setPoolSize(incrementAndGet);
        } else {
            if (incrementAndGet >= THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE.intValue() || poolSize <= THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE.intValue()) {
                return;
            }
            threadPoolTaskScheduler.setPoolSize(THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE.intValue());
        }
    }

    public void stopCron(String str) {
        Pair<String, ScheduledFuture<?>> pair = this.taskFutures.get(str);
        if (pair != null) {
            ScheduledFuture scheduledFuture = (ScheduledFuture) pair.getSecond();
            if (scheduledFuture != null) {
                log.info("Project[{}] stop cron", str);
                scheduledFuture.cancel(true);
            }
            this.taskFutures.remove(str);
            this.schedulerProjectCount.decrementAndGet();
        }
    }

    @Scheduled(cron = "*/30 * * * * ?")
    public void schedulerAutoRefresh() {
        NProjectManager nProjectManager = NProjectManager.getInstance(KylinConfig.readSystemKylinConfig());
        schedulerProject(nProjectManager);
        cancelDeletedProject(nProjectManager);
    }

    private void schedulerProject(NProjectManager nProjectManager) {
        Iterator it = nProjectManager.listAllProjects().iterator();
        while (it.hasNext()) {
            autoRefreshSnapshot((ProjectInstance) it.next());
        }
    }

    public boolean autoRefreshSnapshot(ProjectInstance projectInstance) {
        KylinConfig config = projectInstance.getConfig();
        if (!config.isSnapshotManualManagementEnabled() || !config.isSnapshotAutoRefreshEnabled()) {
            return false;
        }
        String name = projectInstance.getName();
        if (checkEpochOwner(name)) {
            return false;
        }
        Pair<String, ScheduledFuture<?>> pair = this.taskFutures.get(name);
        String snapshotAutoRefreshCron = config.getSnapshotAutoRefreshCron();
        if (pair != null && ((ScheduledFuture) pair.getSecond()) != null && StringUtils.equals((CharSequence) pair.getFirst(), snapshotAutoRefreshCron)) {
            log.info("Project[{}] skip schedulerAutoRefresh, because is running, cron[{}]", name, snapshotAutoRefreshCron);
            checkRefreshRunnerJobPool(config, name);
            return false;
        }
        AutoRefreshSnapshotRunner autoRefreshSnapshotRunner = AutoRefreshSnapshotRunner.getInstance(projectInstance.getName());
        autoRefreshSnapshotRunner.setRestTemplate(this.restTemplate);
        checkRefreshRunnerJobPool(config, name);
        startCron(name, autoRefreshSnapshotRunner, snapshotAutoRefreshCron);
        return true;
    }

    private boolean checkEpochOwner(String str) {
        String currentEpochOwner = EpochManager.getInstance().getEpoch(str).getCurrentEpochOwner();
        String localInstance = AddressUtil.getLocalInstance();
        if (currentEpochOwner == null || currentEpochOwner.split("\\|")[0].equals(localInstance)) {
            return false;
        }
        log.info("EpochOwner[{}] is not Project[{}] epoch owner,and ServerInfo is [{}] ", new Object[]{currentEpochOwner, str, localInstance});
        AutoRefreshSnapshotRunner.shutdown(str);
        stopCron(str);
        return true;
    }

    public void checkRefreshRunnerJobPool(KylinConfig kylinConfig, String str) {
        ThreadPoolExecutor threadPoolExecutor;
        int corePoolSize;
        int snapshotAutoRefreshMaxConcurrentJobLimit;
        AutoRefreshSnapshotRunner instanceByProject = AutoRefreshSnapshotRunner.getInstanceByProject(str);
        if (instanceByProject == null || (snapshotAutoRefreshMaxConcurrentJobLimit = kylinConfig.getSnapshotAutoRefreshMaxConcurrentJobLimit()) == (corePoolSize = (threadPoolExecutor = (ThreadPoolExecutor) instanceByProject.getJobPool()).getCorePoolSize())) {
            return;
        }
        threadPoolExecutor.setCorePoolSize(snapshotAutoRefreshMaxConcurrentJobLimit);
        threadPoolExecutor.setMaximumPoolSize(snapshotAutoRefreshMaxConcurrentJobLimit);
        log.info("update AutoRefreshSnapshotRunner job pool size : {} old pool size : {}", Integer.valueOf(snapshotAutoRefreshMaxConcurrentJobLimit), Integer.valueOf(corePoolSize));
    }

    public void cancelDeletedProject(NProjectManager nProjectManager) {
        for (String str : this.taskFutures.keySet()) {
            ProjectInstance project = nProjectManager.getProject(str);
            if (null == project) {
                AutoRefreshSnapshotRunner.shutdown(str);
                stopCron(str);
                deleteProjectSnapshotAutoUpdateDir(str);
                log.info("Project[{}] is deleted...", str);
            } else {
                KylinConfigExt config = project.getConfig();
                if (!config.isSnapshotManualManagementEnabled() || !config.isSnapshotAutoRefreshEnabled()) {
                    AutoRefreshSnapshotRunner.shutdown(str);
                    stopCron(str);
                    deleteProjectSnapshotAutoUpdateDir(str);
                    log.info("Project[{}] stop auto fresh snapshot...", str);
                }
            }
        }
    }

    public void deleteProjectSnapshotAutoUpdateDir(String str) {
        try {
            FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
            Path path = new Path(KylinConfig.readSystemKylinConfig().getSnapshotAutoRefreshDir(str));
            if (workingFileSystem.exists(path)) {
                workingFileSystem.delete(path, true);
                log.debug("delete project[{}] snapshot auto update dir success", str);
            }
        } catch (IOException e) {
            log.error("delete project[{}] snapshot auto update dir has error", str, e);
        }
    }

    public void afterPropertiesSet() throws Exception {
        log.info("AutoRefreshSnapshotScheduler init...");
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        for (ProjectInstance projectInstance : NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).listAllProjects()) {
            KylinConfigExt config = projectInstance.getConfig();
            if (config.isSnapshotManualManagementEnabled() && config.isSnapshotAutoRefreshEnabled()) {
                String name = projectInstance.getName();
                if (!checkEpochOwner(name) && workingFileSystem.exists(new Path(config.getSnapshotAutoRefreshDir(name) + "_mark"))) {
                    log.error("Project[{}] last cron task was stopped manually, autoRefreshSnapshotRunner doRun", name);
                    AutoRefreshSnapshotRunner.getInstance(name).runWhenSchedulerInit();
                }
            } else {
                deleteProjectSnapshotAutoUpdateDir(projectInstance.getName());
            }
        }
    }

    @Generated
    public Map<String, Pair<String, ScheduledFuture<?>>> getTaskFutures() {
        return this.taskFutures;
    }

    @Generated
    public AtomicInteger getSchedulerProjectCount() {
        return this.schedulerProjectCount;
    }
}
