package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.datasource.storage.StorageStore;
import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
import org.apache.spark.sql.datasource.storage.StorageStoreUtils;
import org.apache.spark.sql.datasource.storage.WriteTaskStats;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ldap.transaction.compensating.support.DefaultTempEntryRenamingStrategy;
import scala.collection.JavaConversions;

@Deprecated
/* loaded from: input_file:org/apache/kylin/engine/spark/job/DFBuildJob.class */
public class DFBuildJob extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(DFBuildJob.class);
    protected static String TEMP_DIR_SUFFIX = DefaultTempEntryRenamingStrategy.DEFAULT_TEMP_SUFFIX;
    public final HashMap<String, Long> seg2Count = new HashMap<>();
    protected NDataflowManager dfMgr;
    protected BuildLayoutWithUpdate buildLayoutWithUpdate;

    public static void main(String[] strArr) {
        new DFBuildJob().execute(strArr);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate();
        String param = getParam(NBatchConstants.P_DATAFLOW_ID);
        HashSet newHashSet = Sets.newHashSet(StringUtils.split(getParam(NBatchConstants.P_SEGMENT_IDS), ","));
        Set<Long> str2Longs = NSparkCubingUtil.str2Longs(getParam(NBatchConstants.P_LAYOUT_IDS));
        this.dfMgr = NDataflowManager.getInstance(this.config, this.project);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Path jobTmpShareDir = this.config.getJobTmpShareDir(this.project, this.jobId);
        if (this.config.isBuildCheckPartitionColEnabled()) {
            checkDateFormatIfExist(this.project, param);
        }
        IndexPlan indexPlan = this.dfMgr.getDataflow(param).getIndexPlan();
        Set set = (Set) NSparkCubingUtil.toLayouts(indexPlan, str2Longs).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
        buildSnapshot();
        for (String str : newHashSet) {
            NSpanningTree fromLayouts = NSpanningTreeFactory.fromLayouts(set, param);
            NDataSegment segment = getSegment(str);
            if (!needSkipSegment(str)) {
                DFChooser dFChooser = new DFChooser(fromLayouts, segment, this.jobId, this.ss, this.config, true);
                dFChooser.decideSources();
                NBuildSourceInfo flatTableSource = dFChooser.flatTableSource();
                Map<Long, NBuildSourceInfo> reuseSources = dFChooser.reuseSources();
                this.infos.clearCuboidsNumPerLayer(str);
                if (!reuseSources.isEmpty()) {
                    NBuildSourceInfo nBuildSourceInfo = (NBuildSourceInfo) Collections.min(reuseSources.values(), (nBuildSourceInfo2, nBuildSourceInfo3) -> {
                        return Math.toIntExact(nBuildSourceInfo2.getCount() - nBuildSourceInfo3.getCount());
                    });
                    this.seg2Count.put(str, Long.valueOf(SanityChecker.getCount(nBuildSourceInfo.getParentDS(), indexPlan.getLayoutEntity(Long.valueOf(nBuildSourceInfo.getLayoutId())))));
                    build(reuseSources.values(), str, fromLayouts);
                }
                if (flatTableSource != null) {
                    String persistFlatTableIfNecessary = dFChooser.persistFlatTableIfNecessary();
                    if (persistFlatTableIfNecessary.isEmpty()) {
                        logger.info("FlatTable not persisted, only compute row count");
                        updateColumnBytesInseg(param, new HashMap(), segment.getId(), flatTableSource.getFlattableDS().count());
                    } else {
                        logger.info("FlatTable persisted, compute column size");
                        arrayList.add(persistFlatTableIfNecessary);
                        computeColumnBytes(dFChooser, segment, param, persistFlatTableIfNecessary);
                    }
                    if (!StringUtils.isBlank(flatTableSource.getViewFactTablePath())) {
                        arrayList2.add(flatTableSource.getViewFactTablePath());
                    }
                    if (!this.seg2Count.containsKey(str)) {
                        this.seg2Count.put(str, Long.valueOf(flatTableSource.getParentDS().count()));
                    }
                    build(Collections.singletonList(flatTableSource), str, fromLayouts);
                }
                this.infos.recordSpanningTree(str, fromLayouts);
            }
        }
        updateSegmentSourceBytesSize(param, ResourceDetectUtils.getSegmentSourceSize(jobTmpShareDir));
        tailingCleanups(newHashSet, arrayList, arrayList2);
        this.buildLayoutWithUpdate.shutDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void buildSnapshot() throws IOException {
        String param = getParam(NBatchConstants.P_DATAFLOW_ID);
        SnapshotBuilder snapshotBuilder = new SnapshotBuilder();
        if (!this.config.isSnapshotManualManagementEnabled()) {
            snapshotBuilder.buildSnapshot(this.ss, this.dfMgr.getDataflow(param).getModel(), getIgnoredSnapshotTables());
        } else {
            logger.info("Skip snapshot build in snapshot manual mode, dataflow: {}, only calculate total rows", param);
            snapshotBuilder.calculateTotalRows(this.ss, this.dfMgr.getDataflow(param).getModel(), getIgnoredSnapshotTables());
        }
    }

    private void computeColumnBytes(DFChooser dFChooser, NDataSegment nDataSegment, String str, String str2) {
        this.ss.sparkContext().setJobDescription("Compute column bytes");
        Dataset<Row> parquet = this.ss.read().parquet(str2);
        updateColumnBytesInseg(str, JavaConversions.mapAsJavaMap(dFChooser.computeColumnBytes(parquet)), nDataSegment.getId(), parquet.count());
        this.ss.sparkContext().setJobDescription((String) null);
    }

    private boolean needSkipSegment(String str) {
        NDataSegment segment = getSegment(str);
        if (segment != null && segment.getSegRange() != null && segment.getModel() != null && segment.getIndexPlan() != null) {
            return false;
        }
        logger.info("Skip segment {}", str);
        if (segment == null) {
            return true;
        }
        logger.info("Args is {} {} {}", new Object[]{segment.getSegRange(), segment.getModel(), segment.getIndexPlan()});
        return true;
    }

    public void updateColumnBytesInseg(String str, Map<String, Object> map, String str2, long j) {
        HashMap newHashMap = Maps.newHashMap();
        int capacitySampleRows = this.config.getCapacitySampleRows();
        double d = j < ((long) capacitySampleRows) ? 1.0d : j / capacitySampleRows;
        Iterator<Map.Entry<String, Object>> it2 = map.entrySet().iterator();
        while (it2.hasNext()) {
            newHashMap.put(it2.next().getKey(), Long.valueOf((long) (Long.parseLong(r0.getValue().toString()) * d)));
        }
        NDataflow dataflow = this.dfMgr.getDataflow(str);
        NDataflow copy = dataflow.copy();
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        ArrayList newArrayList = Lists.newArrayList();
        NDataSegment segment = copy.getSegment(str2);
        segment.setSourceCount(j);
        segment.getColumnSourceBytes().putAll(newHashMap);
        newArrayList.add(segment);
        nDataflowUpdate.setToUpdateSegs((NDataSegment[]) newArrayList.toArray(new NDataSegment[0]));
        this.dfMgr.updateDataflow(nDataflowUpdate);
    }

    public void tailingCleanups(Set<String> set, List<String> list, List<String> list2) throws IOException {
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        for (String str : list2) {
            workingFileSystem.delete(new Path(str), true);
            logger.debug("Delete persisted view fact table: {}.", str);
        }
        if (!this.config.isPersistFlatTableEnabled()) {
            for (String str2 : list) {
                workingFileSystem.delete(new Path(str2), true);
                logger.debug("Delete persisted flat table: {}.", str2);
            }
        }
        resetSegmentMemOnly(set, !this.config.isPersistFlatTableEnabled());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateSegmentSourceBytesSize(String str, Map<String, Object> map) {
        NDataflow dataflow = this.dfMgr.getDataflow(str);
        NDataflow copy = dataflow.copy();
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            NDataSegment segment = copy.getSegment(entry.getKey());
            if (Objects.isNull(segment)) {
                logger.info("Skip empty segment {} when updating segment source", entry.getKey());
            } else {
                segment.setSourceBytesSize(((Long) entry.getValue()).longValue());
                segment.setLastBuildTime(System.currentTimeMillis());
                newArrayList.add(segment);
            }
        }
        nDataflowUpdate.setToUpdateSegs((NDataSegment[]) newArrayList.toArray(new NDataSegment[0]));
        this.dfMgr.updateDataflow(nDataflowUpdate);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(this.config, this.project);
        nIndexPlanManager.updateIndexPlan(str, indexPlan -> {
            indexPlan.setLayoutBucketNumMapping(nIndexPlanManager.getIndexPlan(str).getLayoutBucketNumMapping());
        });
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String calculateRequiredCores() throws Exception {
        if (!this.config.getSparkEngineTaskImpactInstanceEnabled().booleanValue()) {
            return "1";
        }
        String maxLeafTasksNums = maxLeafTasksNums(this.config.getJobTmpShareDir(this.project, this.jobId));
        int parseDouble = ((int) Double.parseDouble(maxLeafTasksNums)) / KylinConfig.getInstanceFromEnv().getSparkEngineTaskCoreFactor();
        logger.info("The maximum number of tasks required to run the job is {}, require cores: {}", maxLeafTasksNums, Integer.valueOf(parseDouble));
        return String.valueOf(parseDouble);
    }

    private String maxLeafTasksNums(Path path) throws IOException {
        return ResourceDetectUtils.selectMaxValueInFiles(HadoopUtil.getWorkingFileSystem().listStatus(path, path2 -> {
            return path2.toString().endsWith(ResourceDetectUtils.cubingDetectItemFileSuffix());
        }));
    }

    public NDataSegment getSegment(String str) {
        return this.dfMgr.getDataflow(getParam(NBatchConstants.P_DATAFLOW_ID)).getSegment(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void build(Collection<NBuildSourceInfo> collection, String str, NSpanningTree nSpanningTree) throws IOException {
        List<NBuildSourceInfo> buildLayer = buildLayer(collection, str, nSpanningTree);
        LinkedList linkedList = new LinkedList();
        if (!buildLayer.isEmpty()) {
            linkedList.offer(buildLayer);
        }
        while (!linkedList.isEmpty()) {
            List<NBuildSourceInfo> buildLayer2 = buildLayer((List) linkedList.poll(), str, nSpanningTree);
            if (!buildLayer2.isEmpty()) {
                linkedList.offer(buildLayer2);
            }
        }
    }

    private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> collection, String str, final NSpanningTree nSpanningTree) throws IOException {
        final NDataSegment segment = getSegment(str);
        int i = 0;
        ArrayList arrayList = new ArrayList();
        for (final NBuildSourceInfo nBuildSourceInfo : collection) {
            Collection<IndexEntity> toBuildCuboids = nBuildSourceInfo.getToBuildCuboids();
            this.infos.recordParent2Children(segment.getLayout(nBuildSourceInfo.getLayoutId()), (HashSet) toBuildCuboids.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toCollection(Sets::newHashSet)));
            i += toBuildCuboids.size();
            Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
            final Dataset<Row> parentDS = nBuildSourceInfo.getParentDS();
            for (final IndexEntity indexEntity : toBuildCuboids) {
                Preconditions.checkNotNull(parentDS, "Parent dataset is null when building.");
                this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.engine.spark.job.DFBuildJob.1
                    @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                    public long getIndexId() {
                        return indexEntity.getId();
                    }

                    @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                    public String getName() {
                        return "build-index-" + indexEntity.getId();
                    }

                    @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                    public List<NDataLayout> build() throws IOException {
                        return DFBuildJob.this.buildIndex(segment, indexEntity, parentDS, nSpanningTree, nBuildSourceInfo.getLayoutId());
                    }
                }, this.config);
                arrayList.add(indexEntity);
            }
        }
        this.infos.recordCuboidsNumPerLayer(str, i);
        this.buildLayoutWithUpdate.updateLayout(segment, this.config, this.project);
        nSpanningTree.decideTheNextLayer(arrayList, getSegment(str));
        return constructTheNextLayerBuildInfos(nSpanningTree, segment, arrayList);
    }

    protected List<NBuildSourceInfo> constructTheNextLayerBuildInfos(NSpanningTree nSpanningTree, NDataSegment nDataSegment, Collection<IndexEntity> collection) {
        ArrayList arrayList = new ArrayList();
        for (IndexEntity indexEntity : collection) {
            Collection<IndexEntity> childrenByIndexPlan = nSpanningTree.getChildrenByIndexPlan(indexEntity);
            if (!childrenByIndexPlan.isEmpty()) {
                NBuildSourceInfo nBuildSourceInfo = new NBuildSourceInfo();
                nBuildSourceInfo.setSparkSession(this.ss);
                LayoutEntity layoutEntity = (LayoutEntity) new ArrayList(nSpanningTree.getLayouts(indexEntity)).get(0);
                nBuildSourceInfo.setLayoutId(layoutEntity.getId());
                nBuildSourceInfo.setParentStorageDF(StorageStoreUtils.toDF(nDataSegment, layoutEntity, this.ss));
                nBuildSourceInfo.setToBuildCuboids(childrenByIndexPlan);
                arrayList.add(nBuildSourceInfo);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<NDataLayout> buildIndex(NDataSegment nDataSegment, IndexEntity indexEntity, Dataset<Row> dataset, NSpanningTree nSpanningTree, long j) throws IOException {
        Dataset<Row> agg;
        Function function;
        String valueOf = String.valueOf(j);
        if (j == DFChooser.FLAT_TABLE_FLAG()) {
            valueOf = "flat table";
        }
        logger.info("Build index:{}, in segment:{}", Long.valueOf(indexEntity.getId()), nDataSegment.getId());
        LinkedList newLinkedList = Lists.newLinkedList();
        Set<Integer> keySet = indexEntity.getEffectiveDimCols().keySet();
        if (IndexEntity.isTableIndex(indexEntity.getId())) {
            Preconditions.checkArgument(indexEntity.getMeasures().isEmpty());
            agg = dataset.select(NSparkCubingUtil.getColumns(keySet));
            function = layoutEntity -> {
                return NSparkCubingUtil.getColumns(layoutEntity.getOrderedDimensions().keySet());
            };
        } else {
            agg = CuboidAggregator.agg(dataset, keySet, indexEntity.getEffectiveMeasures(), nDataSegment, nSpanningTree);
            function = layoutEntity2 -> {
                return NSparkCubingUtil.getColumns(layoutEntity2.getOrderedDimensions().keySet(), layoutEntity2.getOrderedMeasures().keySet());
            };
        }
        for (LayoutEntity layoutEntity3 : nSpanningTree.getLayouts(indexEntity)) {
            if (nDataSegment.isAlreadyBuilt(layoutEntity3.getId())) {
                logger.info("Skip already built layout:{}, in index:{}", Long.valueOf(layoutEntity3.getId()), Long.valueOf(indexEntity.getId()));
            } else {
                logger.info("Build layout:{}, in index:{}", Long.valueOf(layoutEntity3.getId()), Long.valueOf(indexEntity.getId()));
                this.ss.sparkContext().setJobDescription("build " + layoutEntity3.getId() + " from parent " + valueOf);
                newLinkedList.add(saveAndUpdateLayout(agg.select((Column[]) function.apply(layoutEntity3)).sortWithinPartitions(NSparkCubingUtil.getColumns(layoutEntity3.getOrderedDimensions().keySet())), nDataSegment, layoutEntity3));
                onLayoutFinished(layoutEntity3.getId());
            }
        }
        this.ss.sparkContext().setJobDescription((String) null);
        logger.info("Finished Build index :{}, in segment:{}", Long.valueOf(indexEntity.getId()), nDataSegment.getId());
        return newLinkedList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NDataLayout saveAndUpdateLayout(Dataset<Row> dataset, NDataSegment nDataSegment, LayoutEntity layoutEntity) throws IOException {
        this.ss.sparkContext().setLocalProperty("spark.scheduler.pool", "build");
        long id = layoutEntity.getId();
        NDataLayout dataLayout = getDataLayout(nDataSegment, id);
        String storagePath = NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(id));
        StorageStore create = StorageStoreFactory.create(layoutEntity.getModel().getStorageType());
        create.setStorageListener(new SanityChecker(this.seg2Count.getOrDefault(nDataSegment.getId(), Long.valueOf(SanityChecker.SKIP_FLAG())).longValue()));
        WriteTaskStats save = create.save(layoutEntity, new Path(storagePath), KapConfig.wrap(this.config), dataset);
        dataLayout.setBuildJobId(this.jobId);
        long numRows = save.numRows();
        if (numRows == -1) {
            KylinBuildEnv.get().buildJobInfos().recordAbnormalLayouts(layoutEntity.getId(), "Job metrics seems null, use count() to collect cuboid rows.");
            logger.warn("Can not get cuboid={} row cnt.", Long.valueOf(layoutEntity.getId()));
        }
        dataLayout.setRows(numRows);
        dataLayout.setSourceRows(save.sourceRows());
        dataLayout.setPartitionNum(save.numBucket());
        dataLayout.setPartitionValues(save.partitionValues());
        dataLayout.setFileCount(save.numFiles());
        dataLayout.setByteSize(save.numBytes());
        dataLayout.setReady(true);
        return dataLayout;
    }

    protected NDataLayout getDataLayout(NDataSegment nDataSegment, long j) {
        return NDataLayout.newDataLayout(nDataSegment.getDataflow(), nDataSegment.getId(), j);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String generateInfo() {
        return LogJobInfoUtils.dfBuildJobInfo();
    }

    private void resetSegmentMemOnly(Set<String> set, boolean z) {
        ((Set) Optional.ofNullable(set).orElseGet(Collections::emptySet)).forEach(str -> {
            NDataSegment segment = getSegment(str);
            NDataflow dataflow = segment.getDataflow();
            NDataSegment segment2 = dataflow.copy().getSegment(segment.getId());
            segment2.setDictReady(false);
            if (z) {
                segment2.setFlatTableReady(false);
            }
            segment2.setFactViewReady(false);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getId());
            nDataflowUpdate.setToUpdateSegs(segment2);
            NDataflowManager.getInstance(this.config, this.project).updateDataflow(nDataflowUpdate);
        });
    }
}
