package org.apache.kylin.rest.service;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.metrics.MetricsCategory;
import org.apache.kylin.common.metrics.MetricsGroup;
import org.apache.kylin.common.metrics.MetricsName;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.helper.RoutineToolHelper;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.factory.JobFactory;
import org.apache.kylin.job.util.JobContextUtil;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.resourcegroup.KylinInstance;
import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum;
import org.apache.kylin.metadata.resourcegroup.ResourceGroup;
import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.ServerInfoResponse;
import org.apache.kylin.rest.service.RoutineJob;
import org.apache.kylin.tool.garbage.LogCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
/* loaded from: input_file:org/apache/kylin/rest/service/ScheduleService.class */
public class ScheduleService extends BasicService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ScheduleService.class);
    private static final String GLOBAL = "global";
    private static final String CLEAN_SPARDER_EVENT_LOG = "http://%s/kylin/api/system/clean_sparder_event_log";

    @Autowired
    @Qualifier("normalRestTemplate")
    RestTemplate restTemplate;

    @Autowired
    FileService fileService;

    @Autowired
    MetadataBackupService backupService;

    @Autowired
    ProjectService projectService;
    private final ExecutorService executors = Executors.newSingleThreadExecutor(new NamedThreadFactory("RoutineTaskScheduler"));
    private final ExecutorService asyncExecutors = new ThreadPoolExecutor(20, 20, 30L, TimeUnit.MINUTES, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("RoutineBroadcastScheduler"));
    private long opsCronTimeout;
    private String tmpMetadataBackupFilePath;
    private static final ThreadLocal<Future<?>> CURRENT_FUTURE;
    private static final Map<Future<?>, Long> ASYNC_FUTURES;

    @Scheduled(cron = "${kylin.metadata.ops-cron:0 0 0 * * *}")
    public void routineTask() {
        submitJob();
    }

    private void submitJob() {
        if (!JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv()).getJobScheduler().isMaster()) {
            log.info("Not master node, skip submitting routine job");
            return;
        }
        List list = (List) NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).listAllProjects().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        list.add("_global");
        Iterator it = list.iterator();
        while (it.hasNext()) {
            ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), (String) it.next()).checkAndSubmitCronJob("ROUTINE_JOB_FACTORY", JobTypeEnum.ROUTINE);
        }
        log.info("Successfully create garbage cleanup jobs.");
    }

    public void doRoutineTaskForGlobal() {
        doTask(() -> {
            log.info("Start to work");
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            long currentTimeMillis = System.currentTimeMillis();
            MetricsGroup.hostTagCounterInc(MetricsName.METADATA_OPS_CRON, MetricsCategory.GLOBAL, GLOBAL);
            SetThreadName setThreadName = new SetThreadName("RoutineOpsWorker", new Object[0]);
            Throwable th = null;
            try {
                try {
                    AtomicReference<Pair<String, String>> atomicReference = new AtomicReference<>(null);
                    broadcastCleanSparderEventLogToAllNodes();
                    executeTask(() -> {
                        atomicReference.set(this.backupService.backupAll());
                    }, "MetadataBackup", currentTimeMillis);
                    executeMetadataBackupInTenantMode(instanceFromEnv, currentTimeMillis, atomicReference);
                    executeTask(() -> {
                        RoutineToolHelper.cleanQueryHistoriesAsync(getRemainingTime(currentTimeMillis), TimeUnit.MILLISECONDS);
                    }, "QueryHistoriesCleanup", currentTimeMillis);
                    executeTask(RoutineToolHelper::cleanStreamingStats, "StreamingStatsCleanup", currentTimeMillis);
                    executeTask(RoutineToolHelper::deleteRawRecItems, "RawRecItemsDeletion", currentTimeMillis);
                    executeTask(RoutineToolHelper::cleanGlobalSourceUsage, "SourceUsageCleanup", currentTimeMillis);
                    executeTask(() -> {
                        this.projectService.cleanupAcl();
                    }, "AclCleanup", currentTimeMillis);
                    executeTask(() -> {
                        this.projectService.cleanRawRecForDeletedProject();
                    }, "RawRecCleanup", currentTimeMillis);
                    executeTask(RoutineToolHelper::cleanStorageForRoutine, "HdfsCleanup", currentTimeMillis);
                    executeTask(() -> {
                        new LogCleaner().cleanUp();
                    }, "RemoteLogCleanup", currentTimeMillis);
                    log.info("Finish to work for global, cost {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    if (setThreadName != null) {
                        if (0 != 0) {
                            try {
                                setThreadName.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            setThreadName.close();
                        }
                    }
                    return true;
                } finally {
                }
            } catch (Throwable th3) {
                if (setThreadName != null) {
                    if (th != null) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
                throw th3;
            }
        });
    }

    public void doRoutineTaskForProject(String str) {
        doTask(() -> {
            log.info("Start to work");
            long currentTimeMillis = System.currentTimeMillis();
            MetricsGroup.hostTagCounterInc(MetricsName.METADATA_OPS_CRON, MetricsCategory.GLOBAL, GLOBAL);
            SetThreadName setThreadName = new SetThreadName("RoutineOpsWorker", new Object[0]);
            Throwable th = null;
            try {
                try {
                    executeTask(() -> {
                        this.projectService.garbageCleanup(str, getRemainingTime(currentTimeMillis));
                    }, "ProjectGarbageCleanup", currentTimeMillis);
                    executeTask(() -> {
                        RoutineToolHelper.cleanEventLog(RoutineToolHelper.CleanType.SPARK, str);
                    }, "EventLogCleanup", currentTimeMillis);
                    log.info("Finish to work for project {}, cost {}ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    if (setThreadName != null) {
                        if (0 != 0) {
                            try {
                                setThreadName.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            setThreadName.close();
                        }
                    }
                    return true;
                } finally {
                }
            } catch (Throwable th3) {
                if (setThreadName != null) {
                    if (th != null) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
                throw th3;
            }
        });
    }

    private void doTask(Callable<Boolean> callable) {
        this.opsCronTimeout = KylinConfig.getInstanceFromEnv().getRoutineOpsTaskTimeOut();
        CURRENT_FUTURE.remove();
        ASYNC_FUTURES.clear();
        try {
            try {
                try {
                    callable.call();
                    ASYNC_FUTURES.clear();
                } catch (InterruptedException e) {
                    log.warn("Routine task execution interrupted", e);
                    Thread.currentThread().interrupt();
                    ASYNC_FUTURES.clear();
                }
            } catch (TimeoutException e2) {
                log.warn("Routine task execution timeout", e2);
                if (CURRENT_FUTURE.get() != null) {
                    CURRENT_FUTURE.get().cancel(true);
                }
                ASYNC_FUTURES.keySet().forEach(future -> {
                    future.cancel(true);
                });
                ASYNC_FUTURES.clear();
            } catch (Exception e3) {
                throw new KylinRuntimeException("Unexpected exception.", e3);
            }
            MetricsGroup.hostTagCounterInc(MetricsName.METADATA_OPS_CRON_SUCCESS, MetricsCategory.GLOBAL, GLOBAL);
        } catch (Throwable th) {
            ASYNC_FUTURES.clear();
            throw th;
        }
    }

    public void executeMetadataBackupInTenantMode(KylinConfig kylinConfig, long j, AtomicReference<Pair<String, String>> atomicReference) throws InterruptedException, TimeoutException {
        ResourceGroupManager resourceGroupManager = ResourceGroupManager.getInstance(kylinConfig);
        if (kylinConfig.isKylinMultiTenantEnabled() && resourceGroupManager.isResourceGroupEnabled()) {
            Map<String, List<KylinInstance>> resourceGroupServerNode = getResourceGroupServerNode(resourceGroupManager);
            log.info("ResourceGroupServerNode : {}", resourceGroupServerNode);
            if (resourceGroupServerNode.size() > 0) {
                try {
                    this.tmpMetadataBackupFilePath = "";
                    executeBroadcastMetadataBackup(() -> {
                        broadcastToServer(resourceGroupServerNode, atomicReference, j);
                    }, "broadcastMetadataBackup", j);
                    if (StringUtils.isNotBlank(this.tmpMetadataBackupFilePath)) {
                        this.fileService.deleteTmpDir(this.tmpMetadataBackupFilePath);
                    }
                    log.info("backup file path [{}] broadcast to server success", atomicReference.get().getFirst());
                } catch (Throwable th) {
                    if (StringUtils.isNotBlank(this.tmpMetadataBackupFilePath)) {
                        this.fileService.deleteTmpDir(this.tmpMetadataBackupFilePath);
                    }
                    throw th;
                }
            }
        }
    }

    public Map<String, List<KylinInstance>> getResourceGroupServerNode(ResourceGroupManager resourceGroupManager) {
        HashMap newHashMap = Maps.newHashMap();
        ResourceGroup resourceGroup = resourceGroupManager.getResourceGroup();
        String localInstance = AddressUtil.getLocalInstance();
        String str = (String) resourceGroup.getKylinInstances().stream().filter(kylinInstance -> {
            return kylinInstance.getInstance().equals(localInstance);
        }).map((v0) -> {
            return v0.getResourceGroupId();
        }).findFirst().orElse(null);
        List list = (List) resourceGroup.getResourceGroupMappingInfoList().stream().filter(resourceGroupMappingInfo -> {
            return resourceGroupMappingInfo.getRequestType() == RequestTypeEnum.BUILD;
        }).map((v0) -> {
            return v0.getResourceGroupId();
        }).filter(str2 -> {
            return !StringUtils.equals(str2, str);
        }).collect(Collectors.toList());
        resourceGroup.getKylinInstances().stream().filter(kylinInstance2 -> {
            return list.contains(kylinInstance2.getResourceGroupId());
        }).forEach(kylinInstance3 -> {
            List list2 = (List) newHashMap.getOrDefault(kylinInstance3.getResourceGroupId(), Lists.newArrayList());
            list2.add(kylinInstance3);
            newHashMap.put(kylinInstance3.getResourceGroupId(), list2);
        });
        return newHashMap;
    }

    public void broadcastToServer(Map<String, List<KylinInstance>> map, AtomicReference<Pair<String, String>> atomicReference, long j) {
        String str = ((String) atomicReference.get().getFirst()) + "/metadata.zip";
        String str2 = (String) atomicReference.get().getSecond();
        try {
            Pair saveMetadataBackupInTmpPath = this.fileService.saveMetadataBackupInTmpPath(str);
            this.tmpMetadataBackupFilePath = (String) saveMetadataBackupInTmpPath.getFirst();
            Long l = (Long) saveMetadataBackupInTmpPath.getSecond();
            for (Map.Entry<String, List<KylinInstance>> entry : map.entrySet()) {
                List<KylinInstance> value = entry.getValue();
                if (CollectionUtils.isNotEmpty(value)) {
                    KylinInstance kylinInstance = value.get(RandomUtil.nextInt(value.size()));
                    log.info("routineTask[broadcastMetadataBackup] execute to groupId [{}] server [{}]", entry.getKey(), kylinInstance.getInstance());
                    executeAsyncTask(() -> {
                        broadcastToTenantNode((String) entry.getKey(), str2, this.tmpMetadataBackupFilePath, l.longValue(), kylinInstance.getInstance());
                    }, "broadcastToTenantNode-GroupIs[" + entry.getKey() + "]", j);
                }
            }
        } catch (IOException e) {
            log.error("backup file path [{}] broadcast to server has error. reason:", str, e);
        }
    }

    private void broadcastCleanSparderEventLogToAllNodes() {
        try {
            Iterator it = this.clusterManager.getServers().iterator();
            while (it.hasNext()) {
                String format = String.format(Locale.ROOT, CLEAN_SPARDER_EVENT_LOG, ((ServerInfoResponse) it.next()).getHost());
                log.info("Start broadcasting to clean the sparder event log of {}", format);
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.add("Content-Type", "application/vnd.apache.kylin-v4-public+json");
                receive(this.restTemplate.exchange(format, HttpMethod.DELETE, new HttpEntity(httpHeaders), String.class, new Object[0]), "noticeToQueryNode");
            }
        } catch (Exception e) {
            log.error("Broadcast cleaning sparder event log failed!", e);
        }
    }

    private void receive(ResponseEntity<String> responseEntity, String str) throws IOException {
        int statusCodeValue = responseEntity.getStatusCodeValue();
        if (statusCodeValue != 200) {
            log.error("{} failed, HttpStatus is {}", str, Integer.valueOf(statusCodeValue));
        }
        RestResponse restResponse = (RestResponse) JsonUtil.readValue((String) Optional.ofNullable(responseEntity.getBody()).orElse(""), new TypeReference<RestResponse<Boolean>>() { // from class: org.apache.kylin.rest.service.ScheduleService.1
        });
        if (StringUtils.equals(restResponse.getCode(), "000")) {
            return;
        }
        log.error("{} failed, response code is {}", str, restResponse.getCode());
    }

    public void broadcastToTenantNode(String str, String str2, String str3, long j, String str4) {
        try {
            String format = String.format(Locale.ROOT, "http://%s/kylin/api/system/broadcast_metadata_backup", str4);
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("resource_group_id", str);
            newHashMap.put("tmp_file_path", str3);
            newHashMap.put("tmp_file_size", Long.valueOf(j));
            newHashMap.put("backup_dir", str2);
            newHashMap.put("from_host", AddressUtil.getLocalInstance());
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.add("Content-Type", "application/vnd.apache.kylin-v4-public+json");
            receive(this.restTemplate.exchange(format, HttpMethod.POST, new HttpEntity(JsonUtil.writeValueAsBytes(newHashMap), httpHeaders), String.class, new Object[0]), "noticeToTenantNode");
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void executeTask(Runnable runnable, String str, long j) throws InterruptedException, TimeoutException {
        Future<?> submit = this.executors.submit(runnable);
        long remainingTime = getRemainingTime(j);
        log.info("execute task {} with remaining time: {} ms", str, Long.valueOf(remainingTime));
        CURRENT_FUTURE.set(submit);
        try {
            submit.get(remainingTime, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            log.warn("Routine task {} execution failed, reason:", str, e);
        }
    }

    public void executeBroadcastMetadataBackup(Runnable runnable, String str, long j) throws InterruptedException, TimeoutException {
        executeTask(runnable, str, j);
        cancelTimeoutAsyncTask(j);
    }

    public void cancelTimeoutAsyncTask(long j) throws InterruptedException {
        while (ASYNC_FUTURES.size() > 0) {
            ASYNC_FUTURES.forEach((future, l) -> {
                if (getRemainingTime(l.longValue()) <= 0) {
                    future.cancel(true);
                }
            });
            long count = ASYNC_FUTURES.keySet().stream().filter((v0) -> {
                return v0.isDone();
            }).count();
            if (count == ASYNC_FUTURES.size()) {
                log.info("all running asyncTask[broadcastToServer] is done");
                return;
            } else {
                if (getRemainingTime(j) <= 0) {
                    log.warn("cancel all running asyncTask, DoneAsyncTask count: [{}], AllAsyncTask count : [{}]", Long.valueOf(count), Integer.valueOf(ASYNC_FUTURES.size()));
                    ASYNC_FUTURES.keySet().stream().filter(future2 -> {
                        return !future2.isDone();
                    }).forEach(future3 -> {
                        future3.cancel(true);
                    });
                    return;
                }
                TimeUnit.SECONDS.sleep(10L);
            }
        }
    }

    public void executeAsyncTask(Runnable runnable, String str, long j) {
        Future<?> submit = this.asyncExecutors.submit(runnable);
        log.info("execute async task {} with remaining time: {} ms", str, Long.valueOf(getRemainingTime(j)));
        ASYNC_FUTURES.put(submit, Long.valueOf(System.currentTimeMillis()));
    }

    private long getRemainingTime(long j) {
        return this.opsCronTimeout - (System.currentTimeMillis() - j);
    }

    public Pair<String, String> triggerAllCleanupGarbage(HttpServletRequest httpServletRequest) {
        String jobMaster = JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv()).getJobScheduler().getJobMaster();
        StringBuilder sb = new StringBuilder();
        Pair<String, String> pair = new Pair<>();
        pair.setFirst("000");
        try {
            EnvelopeResponse generateTaskForRemoteHost = generateTaskForRemoteHost(httpServletRequest, "http://" + jobMaster + "/kylin/api/system/do_cleanup_garbage");
            if (generateTaskForRemoteHost.getCode().equals("000")) {
                sb.append(jobMaster).append(":").append("triggered successfully").append(";");
            }
            if (generateTaskForRemoteHost.getCode().equals("999")) {
                pair.setFirst("999");
                sb.append(jobMaster).append(":").append("triggered failed").append(generateTaskForRemoteHost.getMsg()).append(";");
            }
        } catch (Exception e) {
            sb.append(jobMaster).append(":").append("triggered failed: ").append(e.getMessage()).append(";");
        }
        pair.setSecond(sb.toString());
        return pair;
    }

    static {
        JobFactory.register("ROUTINE_JOB_FACTORY", new RoutineJob.RoutineJobFactory());
        CURRENT_FUTURE = new ThreadLocal<>();
        ASYNC_FUTURES = Maps.newConcurrentMap();
    }
}
