package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.datasource.storage.StorageStore;
import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
import org.apache.spark.sql.datasource.storage.WriteTaskStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:org/apache/kylin/engine/spark/job/DFMergeJob.class */
public class DFMergeJob extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(DFMergeJob.class);
    protected BuildLayoutWithUpdate buildLayoutWithUpdate;

    public static Map<Long, DFLayoutMergeAssist> generateMergeAssist(List<NDataSegment> list, SparkSession sparkSession, NDataSegment nDataSegment) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        Iterator<NDataSegment> it = list.iterator();
        while (it.hasNext()) {
            for (NDataLayout nDataLayout : it.next().getSegDetails().getEffectiveLayouts()) {
                long layoutId = nDataLayout.getLayoutId();
                DFLayoutMergeAssist dFLayoutMergeAssist = (DFLayoutMergeAssist) newConcurrentMap.get(Long.valueOf(layoutId));
                if (dFLayoutMergeAssist == null) {
                    DFLayoutMergeAssist dFLayoutMergeAssist2 = new DFLayoutMergeAssist();
                    dFLayoutMergeAssist2.addCuboid(nDataLayout);
                    dFLayoutMergeAssist2.setSs(sparkSession);
                    dFLayoutMergeAssist2.setNewSegment(nDataSegment);
                    dFLayoutMergeAssist2.setLayout(nDataLayout.getLayout());
                    dFLayoutMergeAssist2.setToMergeSegments(list);
                    newConcurrentMap.put(Long.valueOf(layoutId), dFLayoutMergeAssist2);
                } else {
                    dFLayoutMergeAssist.addCuboid(nDataLayout);
                }
            }
        }
        return newConcurrentMap;
    }

    public static void main(String[] strArr) {
        new DFMergeJob().execute(strArr);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate();
        String param = getParam("dataflowId");
        String param2 = getParam("segmentIds");
        Set<Long> str2Longs = NSparkCubingUtil.str2Longs(getParam("layoutIds"));
        mergeColumnSize(param, param2);
        try {
            mergeFlatTable(param, param2);
        } catch (Exception e) {
            logger.warn("Merge flat table failed.", e);
        }
        mergeSegments(param, param2, str2Longs);
    }

    private void mergeColumnSize(String str, String str2) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, this.project);
        NDataflow dataflow = nDataflowManager.getDataflow(str);
        List<NDataSegment> mergingSegments = dataflow.getMergingSegments(dataflow.getSegment(str2));
        Collections.sort(mergingSegments);
        this.infos.clearMergingSegments();
        this.infos.recordMergingSegments(mergingSegments);
        NDataSegment segment = dataflow.copy().getSegment(str2);
        mergeColumnSizeForNewSegment(segment, mergingSegments);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str);
        nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{segment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
    }

    private void mergeColumnSizeForNewSegment(NDataSegment nDataSegment, List<NDataSegment> list) {
        SourceUsageManager sourceUsageManager = SourceUsageManager.getInstance(this.config);
        HashMap newHashMap = Maps.newHashMap();
        for (NDataSegment nDataSegment2 : list) {
            mergeByteSizeMap(newHashMap, MapUtils.isEmpty(nDataSegment2.getColumnSourceBytes()) ? sourceUsageManager.calcAvgColumnSourceBytes(nDataSegment2) : nDataSegment2.getColumnSourceBytes());
        }
        nDataSegment.setColumnSourceBytes(newHashMap);
    }

    private void mergeByteSizeMap(Map<String, Long> map, Map<String, Long> map2) {
        for (Map.Entry<String, Long> entry : map2.entrySet()) {
            map.put(entry.getKey(), Long.valueOf(map.getOrDefault(entry.getKey(), 0L).longValue() + entry.getValue().longValue()));
        }
    }

    protected List<NDataSegment> getMergingSegments(NDataflow nDataflow, NDataSegment nDataSegment) {
        return nDataflow.getMergingSegments(nDataSegment);
    }

    protected void mergeSegments(String str, String str2, Set<Long> set) throws IOException {
        NDataflow dataflow = NDataflowManager.getInstance(this.config, this.project).getDataflow(str);
        final NDataSegment segment = dataflow.getSegment(str2);
        for (final DFLayoutMergeAssist dFLayoutMergeAssist : generateMergeAssist(getMergingSegments(dataflow, segment), this.ss, segment).values()) {
            Dataset<Row> merge = dFLayoutMergeAssist.merge();
            final LayoutEntity layout = dFLayoutMergeAssist.getLayout();
            final Dataset sortWithinPartitions = IndexEntity.isTableIndex(layout.getIndex().getId()) ? merge.sortWithinPartitions(NSparkCubingUtil.getColumns(new Set[]{layout.getOrderedDimensions().keySet()})) : CuboidAggregator.agg(merge, layout.getOrderedDimensions().keySet(), layout.getOrderedMeasures(), segment, null).sortWithinPartitions(NSparkCubingUtil.getColumns(new Set[]{layout.getOrderedDimensions().keySet()}));
            this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.engine.spark.job.DFMergeJob.1
                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public long getIndexId() {
                    return layout.getIndexId();
                }

                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public String getName() {
                    return "merge-layout-" + layout.getId();
                }

                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public List<NDataLayout> build() throws IOException {
                    return Lists.newArrayList(new NDataLayout[]{DFMergeJob.this.saveAndUpdateCuboid(sortWithinPartitions, segment, layout, dFLayoutMergeAssist)});
                }
            }, this.config);
        }
        this.buildLayoutWithUpdate.updateLayout(segment, this.config, this.project);
    }

    protected NDataLayout saveAndUpdateCuboid(Dataset<Row> dataset, NDataSegment nDataSegment, LayoutEntity layoutEntity, DFLayoutMergeAssist dFLayoutMergeAssist) throws IOException {
        this.ss.sparkContext().setLocalProperty("spark.scheduler.pool", "merge");
        long id = layoutEntity.getId();
        long j = 0;
        Iterator<NDataLayout> it = dFLayoutMergeAssist.getCuboids().iterator();
        while (it.hasNext()) {
            j += it.next().getSourceRows();
        }
        NDataLayout newDataLayout = NDataLayout.newDataLayout(nDataSegment.getDataflow(), nDataSegment.getId(), id);
        String storagePath = NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(id));
        StorageStore create = StorageStoreFactory.create(layoutEntity.getModel().getStorageType());
        this.ss.sparkContext().setJobDescription("Merge layout " + id);
        WriteTaskStats save = create.save(layoutEntity, new Path(storagePath), KapConfig.wrap(this.config), dataset);
        this.ss.sparkContext().setJobDescription((String) null);
        newDataLayout.setBuildJobId(this.jobId);
        long numRows = save.numRows();
        if (numRows == -1) {
            KylinBuildEnv.get().buildJobInfos().recordAbnormalLayouts(layoutEntity.getId(), "Job metrics seems null, use count() to collect cuboid rows.");
            logger.info("Can not get cuboid row cnt.");
        }
        newDataLayout.setRows(numRows);
        newDataLayout.setSourceRows(j);
        newDataLayout.setPartitionNum(save.numBucket());
        newDataLayout.setPartitionValues(save.partitionValues());
        newDataLayout.setFileCount(save.numFiles());
        newDataLayout.setByteSize(save.numBytes());
        return newDataLayout;
    }

    private List<String> predicatedSegments(Predicate<NDataSegment> predicate, List<NDataSegment> list) {
        return (List) list.stream().filter(predicate).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
    }

    private List<Path> getSegmentFlatTables(String str, List<NDataSegment> list) {
        List<String> predicatedSegments = predicatedSegments(nDataSegment -> {
            return !nDataSegment.isFlatTableReady();
        }, list);
        if (CollectionUtils.isNotEmpty(predicatedSegments)) {
            logger.warn("[UNEXPECTED_THINGS_HAPPENED] Plan to merge segments' flat table, but found that some's flat table were not ready like [{}]", String.join(",", predicatedSegments));
            return Lists.newArrayList();
        }
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        List<String> predicatedSegments2 = predicatedSegments(nDataSegment2 -> {
            try {
                return !workingFileSystem.exists(this.config.getFlatTableDir(this.project, str, nDataSegment2.getId()));
            } catch (IOException e) {
                logger.warn("[UNEXPECTED_THINGS_HAPPENED] When checking segment's flat table exists, segment id: {}", nDataSegment2.getId(), e);
                return true;
            }
        }, list);
        if (!CollectionUtils.isNotEmpty(predicatedSegments2)) {
            return (List) list.stream().map(nDataSegment3 -> {
                return this.config.getFlatTableDir(this.project, str, nDataSegment3.getId());
            }).collect(Collectors.toList());
        }
        logger.warn("[UNEXPECTED_THINGS_HAPPENED] Plan to merge segments' flat table, but found that some's flat table were not exists like [{}]", String.join(",", predicatedSegments2));
        return Lists.newArrayList();
    }

    private void mergeFlatTable(String str, String str2) {
        if (!this.config.isPersistFlatTableEnabled()) {
            logger.info("project {} flat table persisting is not enabled.", this.project);
            return;
        }
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, this.project);
        NDataflow dataflow = nDataflowManager.getDataflow(str);
        List<NDataSegment> mergingSegments = getMergingSegments(dataflow, dataflow.getSegment(str2));
        if (mergingSegments.size() < 1) {
            return;
        }
        Collections.sort(mergingSegments);
        List<Path> segmentFlatTables = getSegmentFlatTables(str, mergingSegments);
        if (CollectionUtils.isEmpty(segmentFlatTables)) {
            return;
        }
        Dataset dataset = null;
        String[] fieldNames = this.ss.read().parquet(segmentFlatTables.get(0).toString()).schema().fieldNames();
        Arrays.sort(fieldNames);
        for (Path path : segmentFlatTables) {
            Dataset parquet = this.ss.read().parquet(path.toString());
            String[] fieldNames2 = parquet.schema().fieldNames();
            Arrays.sort(fieldNames2);
            if (!Arrays.equals(fieldNames, fieldNames2)) {
                logger.info("Schema: {} in path: {} is conflict with others: {}. Skip merge flat table.", new Object[]{fieldNames2, path, fieldNames});
                return;
            }
            dataset = Objects.isNull(dataset) ? parquet : dataset.union(parquet);
        }
        if (Objects.isNull(dataset)) {
            return;
        }
        Path flatTableDir = this.config.getFlatTableDir(this.project, str, str2);
        this.ss.sparkContext().setLocalProperty("spark.scheduler.pool", "merge");
        this.ss.sparkContext().setJobDescription("Persist flat table.");
        dataset.write().mode(SaveMode.Overwrite).parquet(flatTableDir.toString());
        logger.info("Persist merged flat tables to path {} with schema [{}], new segment id: {}, dataFlowId: {}", new Object[]{flatTableDir, fieldNames, str2, str});
        NDataSegment segment = dataflow.copy().getSegment(str2);
        segment.setFlatTableReady(true);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str);
        nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{segment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String generateInfo() {
        return LogJobInfoUtils.dfMergeJobInfo();
    }
}
