package org.apache.kylin.rest.scheduler;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kyligence.kap.guava20.shaded.common.base.Preconditions;
import io.kyligence.kap.guava20.shaded.common.collect.Lists;
import io.kyligence.kap.guava20.shaded.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.job.snapshot.SnapshotJobUtils;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.class */
public class AutoRefreshSnapshotRunner implements Runnable {
    private static final String SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE = "Project[%s] Save View Mapping Failed";
    private ExecutorService jobPool;
    private KylinConfig projectConfig;
    private final String project;
    private RestTemplate restTemplate;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AutoRefreshSnapshotRunner.class);
    private static final Map<String, AutoRefreshSnapshotRunner> INSTANCE_MAP = Maps.newConcurrentMap();
    private Queue<CheckSourceTableResult> checkSourceTableQueue = new LinkedBlockingQueue();
    private Map<Future<String>, Long> checkSourceTableFutures = Maps.newConcurrentMap();
    private Map<String, List<TableDesc>> sourceTableSnapshotMapping = Maps.newHashMap();
    private Map<String, AtomicInteger> buildSnapshotCount = Maps.newConcurrentMap();

    public static synchronized AutoRefreshSnapshotRunner getInstanceByProject(String str) {
        return INSTANCE_MAP.get(str);
    }

    public static synchronized AutoRefreshSnapshotRunner getInstance(String str) {
        return INSTANCE_MAP.computeIfAbsent(str, str2 -> {
            return new AutoRefreshSnapshotRunner(NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).getProject(str).getConfig(), str);
        });
    }

    private AutoRefreshSnapshotRunner(KylinConfig kylinConfig, String str) {
        Preconditions.checkNotNull(str);
        if (INSTANCE_MAP.containsKey(str)) {
            throw new IllegalStateException("DefaultScheduler for project " + str + " has been initiated. Use getInstance() instead.");
        }
        this.project = str;
        init(kylinConfig, str);
        log.debug("New AutoRefreshSnapshotRunner created by project '{}': {}", str, Integer.valueOf(System.identityHashCode(this)));
    }

    public void init(KylinConfig kylinConfig, String str) {
        int snapshotAutoRefreshMaxConcurrentJobLimit = kylinConfig.getSnapshotAutoRefreshMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(snapshotAutoRefreshMaxConcurrentJobLimit, snapshotAutoRefreshMaxConcurrentJobLimit, Long.MAX_VALUE, TimeUnit.DAYS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("AutoRefreshSnapshotWorker(project:" + str + ")"));
        this.projectConfig = kylinConfig;
        log.info("AutoRefreshSnapshotRunner init project[{}] job pool size: {}", str, Integer.valueOf(snapshotAutoRefreshMaxConcurrentJobLimit));
    }

    public void doRun() {
        try {
            try {
                SetLogCategory setLogCategory = new SetLogCategory("schedule");
                Throwable th = null;
                try {
                    log.info("Project[{}] start check and refresh snapshot", this.project);
                    if (log.isDebugEnabled()) {
                        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.jobPool;
                        log.debug("job pool params: PoolSize[{}] CorePoolSize[{}] ActiveCount[{}] MaximumPoolSize[{}]", new Object[]{Integer.valueOf(threadPoolExecutor.getPoolSize()), Integer.valueOf(threadPoolExecutor.getCorePoolSize()), Integer.valueOf(threadPoolExecutor.getActiveCount()), Integer.valueOf(threadPoolExecutor.getMaximumPoolSize())});
                    }
                    this.projectConfig = NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).getProject(this.project).getConfig();
                    saveSnapshotViewMapping(this.project, this.restTemplate);
                    this.sourceTableSnapshotMapping = getSourceTableSnapshotMapping(SnapshotJobUtils.getSnapshotTables(this.projectConfig, this.project), readViewTableMapping());
                    checkSourceTable(this.sourceTableSnapshotMapping.keySet());
                    waitCheckSourceTableTaskDone();
                    log.info("Project[{}] stop check and refresh snapshot", this.project);
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                    throw th3;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KylinRuntimeException(e);
            } catch (Exception e2) {
                throw new KylinRuntimeException(e2);
            }
        } finally {
            this.checkSourceTableQueue = new LinkedBlockingQueue();
            cancelFuture(this.checkSourceTableFutures);
            this.checkSourceTableFutures = Maps.newConcurrentMap();
            this.sourceTableSnapshotMapping = Maps.newHashMap();
            this.buildSnapshotCount = Maps.newConcurrentMap();
        }
    }

    public void cancelFuture(Map<Future<String>, Long> map) {
        map.keySet().forEach(future -> {
            if (future.isDone()) {
                return;
            }
            future.cancel(true);
        });
    }

    public Map<String, Set<String>> readViewTableMapping() {
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        Path path = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "view_mapping");
        HashMap newHashMap = Maps.newHashMap();
        try {
            if (workingFileSystem.exists(path)) {
                FSDataInputStream open = workingFileSystem.open(path);
                Throwable th = null;
                try {
                    try {
                        newHashMap.putAll((Map) JsonUtil.readValue(open, new TypeReference<Map<String, Set<String>>>() { // from class: org.apache.kylin.rest.scheduler.AutoRefreshSnapshotRunner.1
                        }));
                        if (open != null) {
                            if (0 != 0) {
                                try {
                                    open.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                open.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
        } catch (IOException e) {
            log.error("read viewMapping path[{}] has error", path, e);
        }
        return newHashMap;
    }

    public Map<String, List<TableDesc>> getSourceTableSnapshotMapping(List<TableDesc> list, Map<String, Set<String>> map) {
        HashMap newHashMap = Maps.newHashMap();
        NTableMetadataManager nTableMetadataManager = NTableMetadataManager.getInstance(this.projectConfig, this.project);
        for (Map.Entry<String, Set<String>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (String str : entry.getValue()) {
                TableDesc tableDesc = nTableMetadataManager.getTableDesc(key);
                List list2 = (List) newHashMap.getOrDefault(str, Lists.newArrayList());
                if (tableDesc != null) {
                    list2.add(tableDesc);
                }
                newHashMap.put(str, list2.stream().distinct().collect(Collectors.toList()));
            }
        }
        for (TableDesc tableDesc2 : list) {
            if (!tableDesc2.isView()) {
                String lowerCase = tableDesc2.getIdentity().toLowerCase(Locale.ROOT);
                List list3 = (List) newHashMap.getOrDefault(lowerCase, Lists.newArrayList());
                list3.add(tableDesc2);
                newHashMap.put(lowerCase, list3.stream().distinct().collect(Collectors.toList()));
            }
        }
        return newHashMap;
    }

    public void saveSnapshotViewMapping(String str, RestTemplate restTemplate) {
        try {
            String format = String.format(Locale.ROOT, "http://%s/kylin/api/snapshots/view_mapping", this.projectConfig.getServerAddress());
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("project", str);
            log.debug("checkTableNeedRefresh request: {}", newHashMap);
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.add("Content-Type", "application/vnd.apache.kylin-v4-public+json");
            httpHeaders.add("Timeout", "");
            ResponseEntity exchange = restTemplate.exchange(format, HttpMethod.POST, new HttpEntity(JsonUtil.writeValueAsBytes(newHashMap), httpHeaders), String.class, new Object[0]);
            if (exchange.getStatusCodeValue() != 200) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, str));
            }
            RestResponse restResponse = (RestResponse) JsonUtil.readValue((String) Optional.ofNullable(exchange.getBody()).orElse(""), new TypeReference<RestResponse<Boolean>>() { // from class: org.apache.kylin.rest.scheduler.AutoRefreshSnapshotRunner.2
            });
            if (!StringUtils.equals(restResponse.getCode(), "000")) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, str));
            }
            if (Boolean.FALSE.equals(restResponse.getData())) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, str));
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
            throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, str), e);
        }
    }

    public void checkSourceTable(Set<String> set) {
        for (String str : set) {
            CheckSourceTableRunnable checkSourceTableRunnable = new CheckSourceTableRunnable();
            checkSourceTableRunnable.setProject(this.project);
            checkSourceTableRunnable.setConfig(this.projectConfig);
            checkSourceTableRunnable.setTableIdentity(str);
            checkSourceTableRunnable.setRestTemplate(this.restTemplate);
            checkSourceTableRunnable.setCheckSourceTableQueue(this.checkSourceTableQueue);
            this.sourceTableSnapshotMapping.get(str).stream().filter(tableDesc -> {
                return StringUtils.equalsIgnoreCase(str, tableDesc.getIdentity());
            }).findFirst().ifPresent(tableDesc2 -> {
                checkSourceTableRunnable.setPartitionColumn(tableDesc2.getSelectedSnapshotPartitionCol());
            });
            this.checkSourceTableFutures.put(this.jobPool.submit(checkSourceTableRunnable, "success"), Long.valueOf(System.currentTimeMillis()));
        }
    }

    public void waitCheckSourceTableTaskDone() throws InterruptedException {
        while (true) {
            if (this.checkSourceTableQueue.peek() != null) {
                CheckSourceTableResult poll = this.checkSourceTableQueue.poll();
                if (poll.getNeedRefresh().booleanValue()) {
                    buildSnapshot(poll);
                }
            } else {
                if (this.checkSourceTableFutures.size() == this.checkSourceTableFutures.keySet().stream().filter((v0) -> {
                    return v0.isDone();
                }).count()) {
                    return;
                }
                cancelTimeoutFuture(this.checkSourceTableFutures);
                TimeUnit.SECONDS.sleep(10L);
            }
        }
    }

    public void cancelTimeoutFuture(Map<Future<String>, Long> map) {
        for (Map.Entry<Future<String>, Long> entry : map.entrySet()) {
            Future<String> key = entry.getKey();
            if (!key.isDone() && System.currentTimeMillis() - entry.getValue().longValue() > this.projectConfig.getSnapshotAutoRefreshTaskTimeout()) {
                log.debug("cancel timeout future with timeout setting[{}]", Long.valueOf(this.projectConfig.getSnapshotAutoRefreshTaskTimeout()));
                key.cancel(true);
            }
        }
    }

    public void buildSnapshot(CheckSourceTableResult checkSourceTableResult) {
        for (TableDesc tableDesc : this.sourceTableSnapshotMapping.get(checkSourceTableResult.getTableIdentity())) {
            AtomicInteger orDefault = this.buildSnapshotCount.getOrDefault(tableDesc.getIdentity(), new AtomicInteger(0));
            log.info("buildSnapshotCount is [{}], tableIdentity is [{}]", orDefault, tableDesc.getIdentity());
            if (orDefault.getAndIncrement() == 0) {
                BuildSnapshotRunnable buildSnapshotRunnable = new BuildSnapshotRunnable();
                buildSnapshotRunnable.setProject(this.project);
                buildSnapshotRunnable.setConfig(this.projectConfig);
                buildSnapshotRunnable.setRestTemplate(this.restTemplate);
                buildSnapshotRunnable.setNeedRefresh(checkSourceTableResult.getNeedRefresh());
                buildSnapshotRunnable.setNeedRefreshPartitionsValue(checkSourceTableResult.getNeedRefreshPartitionsValue());
                buildSnapshotRunnable.setTableIdentity(tableDesc.getIdentity());
                buildSnapshotRunnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
                buildSnapshotRunnable.run();
            }
            this.buildSnapshotCount.put(tableDesc.getIdentity(), orDefault);
        }
    }

    public static synchronized void shutdown(String str) {
        AutoRefreshSnapshotRunner instanceByProject = getInstanceByProject(str);
        if (null != instanceByProject) {
            instanceByProject.innerShutdown();
            log.info("update snapshot automatic refresh fetch pool size success");
        }
    }

    public void innerShutdown() {
        if (Thread.currentThread().isInterrupted()) {
            log.warn("shutdown->current thread is interrupted,{}", Thread.currentThread().getName());
            throw new InterruptedException();
        }
        log.info("Shutting down AutoRefreshSnapshotRunner for project {} ....", this.project);
        if (null != this.jobPool) {
            ExecutorServiceUtil.shutdownGracefully(this.jobPool, 60);
        }
        INSTANCE_MAP.remove(this.project);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                SetLogCategory setLogCategory = new SetLogCategory("schedule");
                Throwable th = null;
                try {
                    saveMarkFile();
                    doRun();
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                    deleteMarkFile();
                } catch (Throwable th3) {
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                deleteMarkFile();
                throw th5;
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            deleteMarkFile();
        }
    }

    public void runWhenSchedulerInit() {
        try {
            try {
                SetLogCategory setLogCategory = new SetLogCategory("schedule");
                Throwable th = null;
                try {
                    doRun();
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                    deleteMarkFile();
                } catch (Throwable th3) {
                    if (setLogCategory != null) {
                        if (0 != 0) {
                            try {
                                setLogCategory.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            setLogCategory.close();
                        }
                    }
                    throw th3;
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                deleteMarkFile();
            }
        } catch (Throwable th5) {
            deleteMarkFile();
            throw th5;
        }
    }

    public void saveMarkFile() {
        Path path = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "_mark");
        try {
            FSDataOutputStream create = HadoopUtil.getWorkingFileSystem().create(path, true);
            Throwable th = null;
            try {
                try {
                    create.write(new byte[0]);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            log.error("overwrite mark file [{}] failed!", path, e);
        }
    }

    public void deleteMarkFile() {
        Path path = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "_mark");
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        try {
            if (workingFileSystem.exists(path)) {
                workingFileSystem.delete(path, true);
            }
        } catch (IOException e) {
            log.error("delete mark file [{}] failed!", path, e);
        }
    }

    @Generated
    public void setJobPool(ExecutorService executorService) {
        this.jobPool = executorService;
    }

    @Generated
    public ExecutorService getJobPool() {
        return this.jobPool;
    }

    @Generated
    public void setProjectConfig(KylinConfig kylinConfig) {
        this.projectConfig = kylinConfig;
    }

    @Generated
    public KylinConfig getProjectConfig() {
        return this.projectConfig;
    }

    @Generated
    public Queue<CheckSourceTableResult> getCheckSourceTableQueue() {
        return this.checkSourceTableQueue;
    }

    @Generated
    public Map<Future<String>, Long> getCheckSourceTableFutures() {
        return this.checkSourceTableFutures;
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public void setRestTemplate(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    @Generated
    public RestTemplate getRestTemplate() {
        return this.restTemplate;
    }

    @Generated
    public Map<String, List<TableDesc>> getSourceTableSnapshotMapping() {
        return this.sourceTableSnapshotMapping;
    }

    @Generated
    public Map<String, AtomicInteger> getBuildSnapshotCount() {
        return this.buildSnapshotCount;
    }
}
