package org.apache.kylin.tool;

import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ExecutableApplication;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.ObjectUtils;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/tool/StreamingSparkLogTool.class */
public class StreamingSparkLogTool extends ExecutableApplication {

    @Generated
    private static final Logger log = LoggerFactory.getLogger("diag");
    private static final Option OPTION_STREAMING_DIR = OptionBuilder.getInstance().hasArg().withArgName("DESTINATION_DIR").withDescription("Specify the file to save yarn application id").isRequired(true).create("dir");
    private static final Option OPTION_STREAMING_JOB = OptionBuilder.getInstance().hasArg().withArgName("JOB_ID").withDescription("Specify the job").isRequired(false).create("job");
    private static final Option OPTION_STREAMING_PROJECT = OptionBuilder.getInstance().hasArg().withArgName("OPTION_PROJECT").withDescription("Specify project").isRequired(false).create("project");
    private static final Option OPTION_STREAMING_START_TIME = OptionBuilder.getInstance().withArgName("startTime").hasArg().isRequired(false).withDescription("specify the start of time range to extract logs. ").create("startTime");
    private static final Option OPTION_STREAMING_END_TIME = OptionBuilder.getInstance().withArgName("endTime").hasArg().isRequired(false).withDescription("specify the end of time range to extract logs. ").create("endTime");
    private static final long DAY = 86400000;
    private static final long MAX_DAY = 31;
    private static final int JOB_LOG_COUNT = 3;
    private static final String STREAMING_LOG_ROOT_DIR = "streaming_spark_logs";
    private static final String STREAMING_SPARK_DRIVER_DIR = "spark_driver";
    private static final String STREAMING_SPARK_EXECUTOR_DIR = "spark_executor";
    private static final String STREAMING_SPARK_CHECKPOINT_DIR = "spark_checkpoint";
    private final Options options;
    private final KylinConfig kylinConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingSparkLogTool() {
        this(KylinConfig.getInstanceFromEnv());
    }

    public StreamingSparkLogTool(KylinConfig kylinConfig) {
        this.kylinConfig = kylinConfig;
        this.options = new Options();
        initOptions();
    }

    private static <T> List<T> lastN(Stream<T> stream) {
        ArrayDeque arrayDeque = new ArrayDeque(3);
        stream.forEach(obj -> {
            if (arrayDeque.size() == 3) {
                arrayDeque.pop();
            }
            arrayDeque.add(obj);
        });
        return new ArrayList(arrayDeque);
    }

    private void initOptions() {
        this.options.addOption(OPTION_STREAMING_JOB);
        this.options.addOption(OPTION_STREAMING_PROJECT);
        this.options.addOption(OPTION_STREAMING_DIR);
        this.options.addOption(OPTION_STREAMING_END_TIME);
        this.options.addOption(OPTION_STREAMING_START_TIME);
    }

    @Override // org.apache.kylin.common.util.ExecutableApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.ExecutableApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_STREAMING_DIR);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_STREAMING_JOB);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_STREAMING_PROJECT);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_STREAMING_START_TIME);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_STREAMING_END_TIME);
        if (StringUtils.isNotEmpty(optionValue3) && StringUtils.isNotEmpty(optionValue2)) {
            log.info("start dump streaming spark driver/executor/checkpoint job log, project: {}, jobId: {}", optionValue3, optionValue2);
            dumpExecutorLog(dumpJobDriverLog(optionValue3, optionValue2, optionValue, null, null), optionValue);
            if (optionValue2.contains("_merge")) {
                log.warn("Only build job have checkpoint, current job: {}", optionValue2);
                return;
            } else {
                dumpCheckPoint(optionValue3, StringUtils.split(optionValue2, "_")[0], optionValue);
                return;
            }
        }
        if (StringUtils.isNotEmpty(optionValue4) && StringUtils.isNotEmpty(optionValue5)) {
            Long valueOf = Long.valueOf(Long.parseLong(optionValue4));
            Long valueOf2 = Long.valueOf(Long.parseLong(optionValue5));
            long longValue = (valueOf2.longValue() - valueOf.longValue()) / 86400000;
            if (longValue > MAX_DAY) {
                log.error("time range is too large, startTime: {}, endTime: {}, days: {}", new Object[]{valueOf, valueOf2, Long.valueOf(longValue)});
                return;
            }
            log.info("start dump streaming spark driver/executor/checkpoint full log, startTime: {}, endTime: {}", optionValue4, optionValue5);
            Map<String, Map<String, Set<String>>> dumpAllDriverLog = dumpAllDriverLog(optionValue, optionValue4, optionValue5);
            if (ObjectUtils.isEmpty(dumpAllDriverLog)) {
                return;
            }
            dumpExecutorLog(dumpAllDriverLog, optionValue);
            dumpAllCheckPoint(optionValue, dumpAllDriverLog);
        }
    }

    private Map<String, Map<String, Set<String>>> dumpJobDriverLog(String str, String str2, String str3, String str4, String str5) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        String streamingJobTmpOutputStorePath = this.kylinConfig.getStreamingJobTmpOutputStorePath(str, str2);
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.kylinConfig, str);
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        if (!nExecutableManager.isHdfsPathExists(streamingJobTmpOutputStorePath)) {
            log.warn("The job driver log file on HDFS has not been generated yet, jobId: {}, filePath: {}", str2, streamingJobTmpOutputStorePath);
            return hashMap;
        }
        List<String> filePathsFromHDFSDir = nExecutableManager.getFilePathsFromHDFSDir(streamingJobTmpOutputStorePath, true);
        if (CollectionUtils.isEmpty(filePathsFromHDFSDir)) {
            log.warn("There is no file in the current job HDFS directory: {}", streamingJobTmpOutputStorePath);
            return hashMap;
        }
        File file = new File(str3, String.format(Locale.ROOT, "/%s/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_DRIVER_DIR, str, str2));
        ArrayList<Path> arrayList = new ArrayList();
        ArrayList<Path> arrayList2 = new ArrayList();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        boolean z = StringUtils.isEmpty(str4) && StringUtils.isEmpty(str5);
        filePathsFromHDFSDir.stream().map(Path::new).filter(path -> {
            arrayList2.add(path);
            hashSet2.add(path.getParent().getName());
            if (z) {
                return true;
            }
            Long valueOf = Long.valueOf(Long.parseLong(StringUtils.split(path.getName(), "\\.")[1]));
            return valueOf.compareTo(Long.valueOf(Long.parseLong(str4))) >= 0 && valueOf.compareTo(Long.valueOf(Long.parseLong(str5))) <= 0;
        }).forEach(path2 -> {
            arrayList.add(path2);
            hashSet.add(path2.getParent().getName());
        });
        if (CollectionUtils.isEmpty(arrayList2)) {
            return hashMap;
        }
        try {
            FileUtils.forceMkdir(file);
        } catch (IOException e) {
            log.error("dump streaming driver log failed. ", e);
        }
        if (!z) {
            if (hashSet.isEmpty()) {
                hashSet.add(Collections.max(hashSet2));
            }
            for (Path path3 : arrayList2) {
                if (hashSet.contains(path3.getParent().getName())) {
                    File file2 = new File(file, path3.getParent().getName());
                    FileUtils.forceMkdir(file2);
                    workingFileSystem.copyToLocalFile(path3, new Path(file2.getAbsolutePath()));
                }
            }
            hashMap2.put(str2, hashSet);
            hashMap.put(str, hashMap2);
            return hashMap;
        }
        List lastN = lastN(hashSet.stream().sorted());
        HashSet hashSet3 = new HashSet();
        for (Path path4 : arrayList) {
            String name = path4.getParent().getName();
            if (lastN.contains(name)) {
                File file3 = new File(file, name);
                FileUtils.forceMkdir(file3);
                workingFileSystem.copyToLocalFile(path4, new Path(file3.getAbsolutePath()));
                hashSet3.add(name);
            }
        }
        hashMap2.put(str2, hashSet3);
        hashMap.put(str, hashMap2);
        return hashMap;
    }

    private Map<String, Map<String, Set<String>>> dumpAllDriverLog(String str, String str2, String str3) {
        NProjectManager nProjectManager = NProjectManager.getInstance(this.kylinConfig);
        HashMap hashMap = new HashMap();
        nProjectManager.listAllProjects().forEach(projectInstance -> {
            HashMap hashMap2 = new HashMap();
            String name = projectInstance.getName();
            StreamingJobManager.getInstance(this.kylinConfig, name).listAllStreamingJobMeta().stream().map((v0) -> {
                return v0.getId();
            }).forEach(str4 -> {
                Map<String, Map<String, Set<String>>> dumpJobDriverLog = dumpJobDriverLog(name, str4, str, str2, str3);
                if (ObjectUtils.isEmpty(dumpJobDriverLog)) {
                    return;
                }
                Iterator<Map.Entry<String, Map<String, Set<String>>>> it2 = dumpJobDriverLog.entrySet().iterator();
                while (it2.hasNext()) {
                    Iterator<Map.Entry<String, Set<String>>> it3 = it2.next().getValue().entrySet().iterator();
                    while (it3.hasNext()) {
                        hashMap2.put(str4, it3.next().getValue());
                    }
                }
            });
            if (ObjectUtils.isEmpty(hashMap2)) {
                return;
            }
            hashMap.put(name, hashMap2);
        });
        return hashMap;
    }

    private void dumpAllCheckPoint(String str, Map<String, Map<String, Set<String>>> map) {
        Stream<R> map2 = NProjectManager.getInstance(this.kylinConfig).listAllProjects().stream().map((v0) -> {
            return v0.getName();
        });
        Set<String> keySet = map.keySet();
        keySet.getClass();
        map2.filter((v1) -> {
            return r1.contains(v1);
        }).forEach(str2 -> {
            Set keySet2 = ((Map) map.get(str2)).keySet();
            StreamingJobManager.getInstance(this.kylinConfig, str2).listAllStreamingJobMeta().stream().map((v0) -> {
                return v0.getModelId();
            }).distinct().filter(str2 -> {
                return keySet2.contains(str2.concat("_build"));
            }).forEach(str3 -> {
                dumpCheckPoint(str2, str3, str);
            });
        });
    }

    private void dumpCheckPoint(String str, String str2, String str3) {
        String format = String.format(Locale.ROOT, "%s%s%s", this.kylinConfig.getHdfsWorkingDirectoryWithoutScheme(), "streaming/checkpoint/", str2);
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.kylinConfig, str);
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        if (!nExecutableManager.isHdfsPathExists(format)) {
            log.warn("The job checkpoint file on HDFS has not been generated yet, modelId: {}, filePath: {}", str2, format);
            return;
        }
        if (CollectionUtils.isEmpty(nExecutableManager.getFilePathsFromHDFSDir(format, true))) {
            log.warn("There is no file in the current job HDFS directory: {}", format);
            return;
        }
        File file = new File(str3, String.format(Locale.ROOT, "/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_CHECKPOINT_DIR, str));
        try {
            FileUtils.forceMkdir(file);
            workingFileSystem.copyToLocalFile(new Path(format), new Path(file.getAbsolutePath()));
        } catch (IOException e) {
            log.error("dump streaming checkpoint failed. ", e);
        }
    }

    private void dumpExecutorLog(Map<String, Map<String, Set<String>>> map, String str) {
        if (ObjectUtils.isEmpty(map)) {
            return;
        }
        for (Map.Entry<String, Map<String, Set<String>>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (Map.Entry<String, Set<String>> entry2 : entry.getValue().entrySet()) {
                dumpSingleExecutorLog(key, entry2.getKey(), str, entry2.getValue());
            }
        }
    }

    private void dumpSingleExecutorLog(String str, String str2, String str3, Set<String> set) {
        String format = String.format(Locale.ROOT, "%s%s%s", this.kylinConfig.getHdfsWorkingDirectoryWithoutScheme(), "streaming/spark_logs/", str);
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.kylinConfig, str);
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        if (!nExecutableManager.isHdfsPathExists(format)) {
            log.warn("The job executor log file on HDFS has not been generated yet, jobId: {}, filePath: {}", str2, format);
            return;
        }
        List<String> filePathsFromHDFSDir = nExecutableManager.getFilePathsFromHDFSDir(format, true);
        if (CollectionUtils.isEmpty(filePathsFromHDFSDir)) {
            log.warn("There is no file in the current job HDFS directory: {}", format);
        } else {
            filePathsFromHDFSDir.stream().filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).map(Path::new).filter(path -> {
                return StringUtils.isEmpty(str2) || StringUtils.equals(path.getParent().getParent().getName(), str2);
            }).filter(path2 -> {
                return set.contains(path2.getParent().getName());
            }).forEach(path3 -> {
                File file = new File(str3, String.format(Locale.ROOT, "/%s/%s/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_EXECUTOR_DIR, str, path3.getParent().getParent().getName(), path3.getParent().getName()));
                try {
                    FileUtils.forceMkdir(file);
                    workingFileSystem.copyToLocalFile(path3, new Path(file.getAbsolutePath()));
                } catch (IOException e) {
                    log.error("dump streaming executor log failed. ", e);
                }
            });
        }
    }
}
