package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.Metrics;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.aspectj.apache.bcel.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:WEB-INF/lib/kylin-spark-engine-4.0.4.jar:org/apache/kylin/engine/spark/job/CubeMergeJob.class */
public class CubeMergeJob extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) CubeMergeJob.class);
    private BuildLayoutWithUpdate buildLayoutWithUpdate;
    private Map<Long, CubeMergeAssist> mergeCuboidsAssist;
    private List<CubeSegment> mergingSegments = Lists.newArrayList();
    private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();
    private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
    Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap();
    Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap();

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate(this.config);
        String param = getParam(MetadataConstants.P_CUBE_ID);
        String param2 = getParam(MetadataConstants.P_SEGMENT_IDS);
        CubeInstance cubeByUuid = CubeManager.getInstance(this.config).getCubeByUuid(param);
        this.mergingSegments = cubeByUuid.getMergingSegments(cubeByUuid.getSegmentById(param2));
        Iterator<CubeSegment> it2 = this.mergingSegments.iterator();
        while (it2.hasNext()) {
            this.mergingSegInfos.add(ManagerHub.getSegmentInfo(this.config, getParam(MetadataConstants.P_CUBE_ID), it2.next().getUuid()));
        }
        mergeSegments(param, param2);
        updateSegmentInfo(param, param2);
    }

    private void mergeSegments(String str, String str2) throws IOException {
        Dataset sortWithinPartitions;
        CubeInstance cubeByUuid = CubeManager.getInstance(this.config).getCubeByUuid(str);
        final SegmentInfo segmentInfo = ManagerHub.getSegmentInfo(this.config, getParam(MetadataConstants.P_CUBE_ID), cubeByUuid.getSegmentById(str2).getUuid());
        this.mergeCuboidsAssist = generateMergeAssist(this.mergingSegInfos, this.ss);
        for (final CubeMergeAssist cubeMergeAssist : this.mergeCuboidsAssist.values()) {
            ForestSpanningTree forestSpanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(segmentInfo.toBuildLayouts()));
            Dataset<Row> merge = cubeMergeAssist.merge(this.config, cubeByUuid.getName());
            final LayoutEntity layout = cubeMergeAssist.getLayout();
            if (layout.isTableIndex()) {
                sortWithinPartitions = merge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()));
            } else {
                Set<Integer> keySet = layout.getOrderedDimensions().keySet();
                sortWithinPartitions = CuboidAggregator.agg(this.ss, merge, keySet, layout.getOrderedMeasures(), forestSpanningTree, false).sortWithinPartitions(NSparkCubingUtil.getColumns(keySet));
            }
            final Dataset dataset = sortWithinPartitions;
            this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.engine.spark.job.CubeMergeJob.1
                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public String getName() {
                    return "merge-cuboid-" + layout.getId();
                }

                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public LayoutEntity build() throws IOException {
                    return CubeMergeJob.this.saveAndUpdateCuboid(dataset, segmentInfo, layout, cubeMergeAssist);
                }

                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public NBuildSourceInfo getBuildSourceInfo() {
                    return null;
                }
            }, this.config);
            this.buildLayoutWithUpdate.updateLayout(segmentInfo, this.config);
        }
    }

    public static Map<Long, CubeMergeAssist> generateMergeAssist(List<SegmentInfo> list, SparkSession sparkSession) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        for (SegmentInfo segmentInfo : list) {
            scala.collection.immutable.List<LayoutEntity> layouts = segmentInfo.layouts();
            for (int i = 0; i < layouts.size(); i++) {
                LayoutEntity mo9469apply = layouts.mo9469apply(i);
                long id = mo9469apply.getId();
                CubeMergeAssist cubeMergeAssist = (CubeMergeAssist) newConcurrentMap.get(Long.valueOf(id));
                if (cubeMergeAssist == null) {
                    CubeMergeAssist cubeMergeAssist2 = new CubeMergeAssist();
                    cubeMergeAssist2.addCuboid(mo9469apply);
                    cubeMergeAssist2.setSs(sparkSession);
                    cubeMergeAssist2.setLayout(mo9469apply);
                    cubeMergeAssist2.setNewSegment(segmentInfo);
                    cubeMergeAssist2.setToMergeSegments(list);
                    newConcurrentMap.put(Long.valueOf(id), cubeMergeAssist2);
                } else {
                    cubeMergeAssist.addCuboid(mo9469apply);
                }
            }
        }
        return newConcurrentMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LayoutEntity saveAndUpdateCuboid(Dataset<Row> dataset, SegmentInfo segmentInfo, LayoutEntity layoutEntity, CubeMergeAssist cubeMergeAssist) throws IOException {
        long id = layoutEntity.getId();
        long j = 0;
        Iterator<LayoutEntity> it2 = cubeMergeAssist.getCuboids().iterator();
        while (it2.hasNext()) {
            j += it2.next().getSourceRows();
        }
        this.ss.sparkContext().setJobDescription("merge layout " + id);
        NSparkCubingEngine.NSparkCubingStorage nSparkCubingStorage = (NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(layoutEntity, NSparkCubingEngine.NSparkCubingStorage.class);
        String parquetStoragePath = PathManager.getParquetStoragePath(this.config, getParam("cubeName"), segmentInfo.name(), segmentInfo.identifier(), String.valueOf(id));
        String str = parquetStoragePath + CubeBuildJob.TEMP_DIR_SUFFIX;
        JobMetricsUtils.registerQueryExecutionListener(this.ss, str);
        nSparkCubingStorage.saveTo(str, dataset, this.ss);
        long metrics = JobMetricsUtils.collectMetrics(str).getMetrics(Metrics.CUBOID_ROWS_CNT());
        if (metrics == -1) {
            this.infos.recordAbnormalLayouts(layoutEntity.getId(), "'Job metrics seems null, use count() to collect cuboid rows.'");
            logger.warn("Can not get cuboid row cnt, use count() to collect cuboid rows.");
            layoutEntity.setRows(dataset.count());
        } else {
            layoutEntity.setRows(metrics);
        }
        layoutEntity.setSourceRows(j);
        int repartitionIfNeed = BuildUtils.repartitionIfNeed(layoutEntity, nSparkCubingStorage, parquetStoragePath, str, this.config, this.ss);
        layoutEntity.setShardNum(repartitionIfNeed);
        this.cuboidShardNum.put(Long.valueOf(id), Short.valueOf((short) repartitionIfNeed));
        this.ss.sparkContext().setJobDescription((String) null);
        JobMetricsUtils.registerQueryExecutionListener(this.ss, str);
        BuildUtils.fillCuboidInfo(layoutEntity, parquetStoragePath);
        this.cuboidIdToPreciseSize.put(Long.valueOf(id), Long.valueOf(layoutEntity.getByteSize()));
        this.cuboidIdToPreciseRows.put(Long.valueOf(id), Long.valueOf(layoutEntity.getRows()));
        return layoutEntity;
    }

    private void updateSegmentInfo(String str, String str2) throws IOException {
        CubeManager cubeManager = CubeManager.getInstance(this.config);
        CubeInstance latestCopyForWrite = cubeManager.getCubeByUuid(str).latestCopyForWrite();
        CubeUpdate cubeUpdate = new CubeUpdate(latestCopyForWrite);
        ArrayList newArrayList = Lists.newArrayList();
        CubeSegment segmentById = latestCopyForWrite.getSegmentById(str2);
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        Iterator<CubeMergeAssist> it2 = this.mergeCuboidsAssist.values().iterator();
        while (it2.hasNext()) {
            j += it2.next().getLayout().getByteSize();
        }
        for (CubeSegment cubeSegment : this.mergingSegments) {
            j2 += cubeSegment.getInputRecords();
            j3 += cubeSegment.getInputRecordsSize();
        }
        segmentById.setSizeKB(j / Constants.BRANCH_INSTRUCTION);
        segmentById.setInputRecords(j2);
        segmentById.setInputRecordsSize(j3);
        segmentById.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
        segmentById.setCuboidShardNums(this.cuboidShardNum);
        Map<String, String> additionalInfo = segmentById.getAdditionalInfo();
        additionalInfo.put("storageType", "4");
        segmentById.setAdditionalInfo(additionalInfo);
        segmentById.setCuboidStaticsRowsBytes(this.cuboidIdToPreciseRows);
        segmentById.setCuboidStaticsSizeBytes(this.cuboidIdToPreciseSize);
        newArrayList.add(segmentById);
        cubeUpdate.setToUpdateSegs((CubeSegment[]) newArrayList.toArray(new CubeSegment[0]));
        cubeManager.updateCube(cubeUpdate, true);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String generateInfo() {
        return LogJobInfoUtils.dfMergeJobInfo();
    }

    public static void main(String[] strArr) {
        new CubeMergeJob().execute(strArr);
    }
}
