package org.apache.kylin.rest.scheduler;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kyligence.kap.guava20.shaded.common.collect.Maps;
import io.kyligence.kap.guava20.shaded.common.collect.Sets;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;

/* loaded from: input_file:org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.class */
public class BuildSnapshotRunnable extends AbstractSchedulerRunnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BuildSnapshotRunnable.class);
    private static final String BUILD_SNAPSHOT_ERROR_MESSAGE = "Project[%s] Snapshot[%s] buildSnapshot failed";

    @Override // org.apache.kylin.rest.scheduler.AbstractSchedulerRunnable
    public void execute() {
        buildSnapshot();
    }

    public void buildSnapshot() {
        if (Boolean.TRUE.equals(this.needRefresh) || Boolean.TRUE.equals(checkSnapshotJobFile())) {
            try {
                String format = String.format(Locale.ROOT, "http://%s/kylin/api/snapshots/auto_refresh", this.config.getServerAddress());
                Map<Object, Object> createRequestAndCheckRunningJob = createRequestAndCheckRunningJob();
                log.debug("buildSnapshot request: {}", createRequestAndCheckRunningJob);
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.add("Content-Type", "application/vnd.apache.kylin-v4-public+json");
                ResponseEntity exchange = this.restTemplate.exchange(format, HttpMethod.PUT, new HttpEntity(JsonUtil.writeValueAsBytes(createRequestAndCheckRunningJob), httpHeaders), String.class, new Object[0]);
                String str = (String) Optional.ofNullable(exchange.getBody()).orElse("");
                if (exchange.getStatusCodeValue() != 200) {
                    throw new KylinRuntimeException(String.format(Locale.ROOT, BUILD_SNAPSHOT_ERROR_MESSAGE, this.project, this.tableIdentity));
                }
                RestResponse restResponse = (RestResponse) JsonUtil.readValue(str, new TypeReference<RestResponse<JobInfoResponse>>() { // from class: org.apache.kylin.rest.scheduler.BuildSnapshotRunnable.1
                });
                if (!StringUtils.equals(restResponse.getCode(), "000")) {
                    throw new KylinRuntimeException(String.format(Locale.ROOT, BUILD_SNAPSHOT_ERROR_MESSAGE, this.project, this.tableIdentity));
                }
                saveSnapshotJobFile(false, "", ((JobInfoResponse) restResponse.getData()).getJobs().get(0).getJobId());
                log.info("Project[{}}] Snapshot[{}] buildSnapshot API success, response [{}]", new Object[]{this.project, this.tableIdentity, restResponse.getData()});
            } catch (Exception e) {
                saveSnapshotJobFile(true, e.getMessage(), "");
                log.error("Project[{}] Snapshot[{}] buildSnapshot failed", this.project, this.tableIdentity);
                throw new KylinRuntimeException(e.getMessage(), e);
            }
        }
    }

    public Boolean checkSnapshotJobFile() {
        Map<String, String> readSnapshotJobFile = readSnapshotJobFile();
        if (Boolean.parseBoolean(readSnapshotJobFile.getOrDefault("build_error", String.valueOf(true)))) {
            return true;
        }
        String str = readSnapshotJobFile.get("job_id");
        if (StringUtils.isBlank(str)) {
            return true;
        }
        return Boolean.valueOf(!checkAutoRefreshJobSuccessOrRunning(str).booleanValue());
    }

    public Boolean checkAutoRefreshJobSuccessOrRunning(String str) {
        try {
            AbstractExecutable job = NExecutableManager.getInstance(this.config, this.project).getJob(str);
            if (null == job) {
                return false;
            }
            return Boolean.valueOf(job.getStatus().equals(ExecutableState.SUCCEED) || job.getStatus().isRunning());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
    }

    public Map<String, String> readSnapshotJobFile() {
        HashMap newHashMap = Maps.newHashMap();
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        Path path = new Path(this.config.getSnapshotAutoRefreshDir(this.project) + "snapshot_job/" + this.tableIdentity);
        try {
            if (workingFileSystem.exists(path)) {
                FSDataInputStream open = workingFileSystem.open(path);
                Throwable th = null;
                try {
                    try {
                        newHashMap.putAll((Map) JsonUtil.readValue(open, new TypeReference<Map<String, String>>() { // from class: org.apache.kylin.rest.scheduler.BuildSnapshotRunnable.2
                        }));
                        if (open != null) {
                            if (0 != 0) {
                                try {
                                    open.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                open.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
        } catch (IOException e) {
            log.error("read SnapshotSourceTableStats path[{}] to SourceTableStats has error", path, e);
        }
        return newHashMap;
    }

    public void saveSnapshotJobFile(Boolean bool, String str, String str2) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("build_error", bool);
        newHashMap.put("error_message", str);
        newHashMap.put("job_id", str2);
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        Path path = new Path(this.config.getSnapshotAutoRefreshDir(this.project) + "snapshot_job/" + this.tableIdentity);
        try {
            FSDataOutputStream create = workingFileSystem.create(path, true);
            Throwable th = null;
            try {
                try {
                    create.write(JsonUtil.writeValueAsBytes(newHashMap));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException e) {
            log.error("overwrite SourceTableStats to path[{}] failed!", path, e);
        }
    }

    public Map<Object, Object> createRequestAndCheckRunningJob() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("need_refresh", this.needRefresh);
        newHashMap.put("project", this.project);
        newHashMap.put("tables", Sets.newHashSet(new String[]{this.tableIdentity}));
        List<NSparkSnapshotJob> runningSnapshotJobs = getRunningSnapshotJobs();
        if (StringUtils.isNotBlank(this.partitionColumn)) {
            if (checkNeedBuildPartitionAndSetTableOption(newHashMap, runningSnapshotJobs)) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, "Project[%s] Snapshot[%s] buildSnapshot failed, because none partitions need build", this.project, this.tableIdentity));
            }
        } else if (CollectionUtils.isNotEmpty(runningSnapshotJobs)) {
            log.info("buildSnapshot: {} has running snapshot job", this.tableIdentity);
            throw new KylinRuntimeException(String.format(Locale.ROOT, "Project[%s] Snapshot[%s] buildSnapshot failed, because has running snapshot job", this.project, this.tableIdentity));
        }
        return newHashMap;
    }

    private List<NSparkSnapshotJob> getRunningSnapshotJobs() {
        Stream filter = NExecutableManager.getInstance(this.config, this.project).listExecByJobTypeAndStatus((v0) -> {
            return v0.isRunning();
        }, new JobTypeEnum[]{JobTypeEnum.SNAPSHOT_BUILD, JobTypeEnum.SNAPSHOT_REFRESH}).stream().filter(abstractExecutable -> {
            return StringUtils.equalsIgnoreCase(abstractExecutable.getParam("table"), this.tableIdentity);
        });
        Class<NSparkSnapshotJob> cls = NSparkSnapshotJob.class;
        NSparkSnapshotJob.class.getClass();
        Stream filter2 = filter.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<NSparkSnapshotJob> cls2 = NSparkSnapshotJob.class;
        NSparkSnapshotJob.class.getClass();
        return (List) filter2.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
    }

    public boolean checkNeedBuildPartitionAndSetTableOption(Map<Object, Object> map, List<NSparkSnapshotJob> list) {
        if (CollectionUtils.isEmpty(this.needRefreshPartitionsValue)) {
            log.info("buildSnapshot: needRefreshPartitionsValue is empty");
            return true;
        }
        log.info("will build snapshot partitions value : {}", this.needRefreshPartitionsValue);
        Sets.SetView<String> needBuildPartitionsValue = getNeedBuildPartitionsValue(list);
        if (CollectionUtils.isEmpty(needBuildPartitionsValue)) {
            log.info("buildSnapshot: needBuildPartitionsValue is empty");
            return true;
        }
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put("partition_col", this.partitionColumn);
        newHashMap2.put("incremental_build", true);
        newHashMap2.put("partitions_to_build", needBuildPartitionsValue);
        newHashMap.put(this.tableIdentity, newHashMap2);
        map.put("options", newHashMap);
        return false;
    }

    private Sets.SetView<String> getNeedBuildPartitionsValue(List<NSparkSnapshotJob> list) {
        return Sets.difference(this.needRefreshPartitionsValue, (Set) list.stream().map(nSparkSnapshotJob -> {
            return nSparkSnapshotJob.getParam("selectedPartition");
        }).map(str -> {
            try {
                return JsonUtil.readValueAsSet(str);
            } catch (IOException e) {
                return Sets.newHashSet();
            }
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet()));
    }
}
