package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.builder.SnapshotPartitionBuilder;
import org.apache.kylin.engine.spark.job.exec.SnapshotExec;
import org.apache.kylin.engine.spark.utils.FileNames;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.guava30.shaded.common.base.Throwables;
import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/SnapshotBuildJob.class */
public class SnapshotBuildJob extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(SnapshotBuildJob.class);

    private static Set<String> toPartitions(String str) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        return ImmutableSet.builder().addAll(Arrays.asList(StringSplitter.split(str, ","))).build();
    }

    public static void main(String[] strArr) {
        new SnapshotBuildJob().execute(strArr);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        SnapshotExec snapshotExec = new SnapshotExec(StringUtils.replace(this.infos.getJobStepId(), SparkApplication.JOB_NAME_PREFIX, ""));
        StageType.SNAPSHOT_BUILD.createStage(this, null, null, snapshotExec);
        snapshotExec.buildSnapshot();
    }

    public void buildSnapshot() throws IOException {
        String param = getParam("table");
        String param2 = getParam("selectedPartitionCol");
        TableDesc tableDesc = NTableMetadataManager.getInstance(this.config, this.project).getTableDesc(param);
        boolean equals = "true".equals(getParam("incrementalBuild"));
        String param3 = getParam("selectedPartition");
        Set<String> set = null;
        if (param3 != null) {
            set = JsonUtil.readValueAsSet(param3);
        }
        if (param2 == null) {
            new SnapshotBuilder().buildSnapshot(this.ss, Sets.newHashSet(new TableDesc[]{tableDesc}));
            return;
        }
        initialize(tableDesc, param2, equals, set);
        TableDesc tableDesc2 = NTableMetadataManager.getInstance(this.config, this.project).getTableDesc(param);
        if (set == null) {
            set = tableDesc2.getNotReadyPartitions();
        }
        logger.info("{} need build partitions: {}", tableDesc2.getIdentity(), set);
        new SnapshotPartitionBuilder().buildSnapshot(this.ss, tableDesc2, param2, set);
        if (equals) {
            moveIncrementalPartitions(tableDesc2.getLastSnapshotPath(), tableDesc2.getTempSnapshotPath());
        }
    }

    private void initialize(TableDesc tableDesc, String str, boolean z, Set<String> set) {
        if (tableDesc.getTempSnapshotPath() != null) {
            logger.info("snapshot partition has been initialed, so skip.");
            return;
        }
        Set<String> tablePartitions = getTablePartitions(tableDesc, str);
        Set keySet = tableDesc.getSnapshotPartitions().keySet();
        String str2 = FileNames.snapshotFile(tableDesc) + "/" + RandomUtil.randomUUID();
        UnitOfWork.doInTransactionWithRetry(() -> {
            NTableMetadataManager nTableMetadataManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
            TableDesc copyForWrite = nTableMetadataManager.copyForWrite(tableDesc);
            if (!z) {
                copyForWrite.resetSnapshotPartitions(tablePartitions);
                copyForWrite.setSnapshotTotalRows(0L);
                TableExtDesc copyForWrite2 = nTableMetadataManager.copyForWrite(nTableMetadataManager.getOrCreateTableExt(tableDesc));
                copyForWrite2.setTotalRows(0L);
                nTableMetadataManager.saveTableExt(copyForWrite2);
            } else if (set == null) {
                copyForWrite.addSnapshotPartitions(Sets.difference(tablePartitions, keySet));
            } else {
                copyForWrite.addSnapshotPartitions(set);
            }
            copyForWrite.setTempSnapshotPath(str2);
            nTableMetadataManager.updateTableDesc(copyForWrite);
            return null;
        }, this.project);
    }

    protected Set<String> getTablePartitions(TableDesc tableDesc, String str) {
        if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
            return toPartitions(getParam("partitions"));
        }
        if (tableDesc.isRangePartition() && tableDesc.getPartitionColumn().equalsIgnoreCase(str)) {
            logger.info("The【{}】column is range partition table,so return partition column.", tableDesc.getName());
            return toPartitions(tableDesc.getPartitionColumn());
        }
        Set<String> tablePartitions = SourceFactory.getSource(tableDesc).getSourceMetadataExplorer().getTablePartitions(tableDesc.getDatabase(), tableDesc.getName(), tableDesc.getProject(), str);
        logger.info("{} current partitions: {}", tableDesc.getIdentity(), tablePartitions);
        return tablePartitions;
    }

    private void moveIncrementalPartitions(String str, String str2) {
        String snapshotDir = getSnapshotDir(str);
        Path path = new Path(getSnapshotDir(str2));
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        try {
            if (workingFileSystem.exists(path)) {
                for (FileStatus fileStatus : workingFileSystem.listStatus(path)) {
                    Path path2 = new Path(snapshotDir + "/" + fileStatus.getPath().getName());
                    if (workingFileSystem.exists(path2)) {
                        logger.info(String.format(Locale.ROOT, "delete non-effective partition %s ", path2));
                        workingFileSystem.delete(path2, true);
                    }
                    if (StringUtils.equalsIgnoreCase(workingFileSystem.getScheme(), "s3a") && workingFileSystem.isDirectory(fileStatus.getPath())) {
                        workingFileSystem.mkdirs(path2);
                        renameS3A(workingFileSystem, fileStatus, path2);
                    } else {
                        workingFileSystem.rename(fileStatus.getPath(), new Path(snapshotDir));
                    }
                }
                workingFileSystem.delete(path, true);
            }
        } catch (Exception e) {
            logger.error(String.format(Locale.ROOT, "from %s to %s move file fail:", str2, str), e);
            Throwables.propagate(e);
        }
    }

    private void renameS3A(FileSystem fileSystem, FileStatus fileStatus, Path path) throws IOException {
        for (FileStatus fileStatus2 : fileSystem.listStatus(fileStatus.getPath())) {
            if (fileSystem.exists(fileStatus2.getPath())) {
                if (fileStatus2.isFile()) {
                    fileSystem.rename(fileStatus2.getPath(), path);
                }
                if (fileStatus2.isDirectory()) {
                    String str = path + "/" + fileStatus2.getPath().getName();
                    Path path2 = new Path(str);
                    if (fileSystem.exists(path2)) {
                        logger.info(String.format(Locale.ROOT, "delete non-effective partition %s ", str));
                        fileSystem.delete(path2, true);
                    }
                    fileSystem.mkdirs(path2);
                    renameS3A(fileSystem, fileStatus2, path2);
                }
            }
        }
    }

    private String getSnapshotDir(String str) {
        return KapConfig.wrap(KylinConfig.getInstanceFromEnv()).getMetadataWorkingDirectory() + "/" + str;
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected Map<String, String> getSparkConfigOverride(KylinConfig kylinConfig) {
        Map<? extends String, ? extends String> snapshotBuildingConfigOverride = kylinConfig.getSnapshotBuildingConfigOverride();
        Map<String, String> sparkConfigOverride = kylinConfig.getSparkConfigOverride();
        sparkConfigOverride.putAll(snapshotBuildingConfigOverride);
        return sparkConfigOverride;
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void chooseContentSize(SparkConfHelper sparkConfHelper) {
    }
}
