package org.apache.kylin.engine.spark;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.3.2.jar:org/apache/kylin/engine/spark/SparkCubingByLayer.class */
public class SparkCubingByLayer extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkCubingByLayer.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_TABLE;
    private Options options = new Options();
    private static final Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR;

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.3.2.jar:org/apache/kylin/engine/spark/SparkCubingByLayer$BaseCuboidReducerFunction2.class */
    public static class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> {
        protected String cubeName;
        protected String metaUrl;
        protected CubeDesc cubeDesc;
        protected int measureNum;
        protected MeasureAggregators aggregators;
        protected volatile transient boolean initialized = false;
        protected SerializableConfiguration conf;

        public BaseCuboidReducerFunction2(String str, String str2, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
        }

        public void init() {
            this.cubeDesc = CubeManager.getInstance(AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl)).getCube(this.cubeName).getDescriptor();
            this.aggregators = new MeasureAggregators(this.cubeDesc.getMeasures());
            this.measureNum = this.cubeDesc.getMeasures().size();
        }

        @Override // 
        public Object[] call(Object[] objArr, Object[] objArr2) throws Exception {
            if (!this.initialized) {
                synchronized (SparkCubingByLayer.class) {
                    if (!this.initialized) {
                        init();
                        this.initialized = true;
                    }
                }
            }
            Object[] objArr3 = new Object[this.measureNum];
            this.aggregators.aggregate(objArr, objArr2, objArr3);
            return objArr3;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.3.2.jar:org/apache/kylin/engine/spark/SparkCubingByLayer$CuboidFlatMap.class */
    public static class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private CubeSegment cubeSegment;
        private CubeDesc cubeDesc;
        private CuboidScheduler cuboidScheduler;
        private NDCuboidBuilder ndCuboidBuilder;
        private RowKeySplitter rowKeySplitter;
        private volatile transient boolean initialized = false;
        private SerializableConfiguration conf;

        public CuboidFlatMap(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public void init() {
            CubeInstance cube = CubeManager.getInstance(AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl)).getCube(this.cubeName);
            this.cubeSegment = cube.getSegmentById(this.segmentId);
            this.cubeDesc = cube.getDescriptor();
            this.cuboidScheduler = this.cubeSegment.getCuboidScheduler();
            this.ndCuboidBuilder = new NDCuboidBuilder(this.cubeSegment, new RowKeyEncoderProvider(this.cubeSegment));
            this.rowKeySplitter = new RowKeySplitter(this.cubeSegment);
        }

        public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
            if (!this.initialized) {
                synchronized (SparkCubingByLayer.class) {
                    if (!this.initialized) {
                        init();
                        this.initialized = true;
                    }
                }
            }
            long split = this.rowKeySplitter.split(((ByteArray) tuple2._1()).array());
            Cuboid findForMandatory = Cuboid.findForMandatory(this.cubeDesc, split);
            List<Long> spanningCuboid = this.cubeSegment.getCuboidScheduler().getSpanningCuboid(split);
            if (spanningCuboid == null || spanningCuboid.size() == 0) {
                return SparkCubingByLayer.EMTPY_ITERATOR.iterator();
            }
            ArrayList arrayList = new ArrayList(spanningCuboid.size());
            Iterator<Long> it = spanningCuboid.iterator();
            while (it.hasNext()) {
                Pair<Integer, ByteArray> buildKey = this.ndCuboidBuilder.buildKey(findForMandatory, Cuboid.findForMandatory(this.cubeDesc, it.next().longValue()), this.rowKeySplitter.getSplitBuffers());
                byte[] bArr = new byte[buildKey.getFirst().intValue()];
                System.arraycopy(buildKey.getSecond().array(), 0, bArr, 0, buildKey.getFirst().intValue());
                arrayList.add(new Tuple2(new ByteArray(bArr), tuple2._2()));
            }
            return arrayList.iterator();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.3.2.jar:org/apache/kylin/engine/spark/SparkCubingByLayer$CuboidReducerFunction2.class */
    public static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
        private boolean[] needAggr;

        public CuboidReducerFunction2(String str, String str2, SerializableConfiguration serializableConfiguration, boolean[] zArr) {
            super(str, str2, serializableConfiguration);
            this.needAggr = zArr;
        }

        @Override // org.apache.kylin.engine.spark.SparkCubingByLayer.BaseCuboidReducerFunction2
        public Object[] call(Object[] objArr, Object[] objArr2) throws Exception {
            if (!this.initialized) {
                synchronized (SparkCubingByLayer.class) {
                    if (!this.initialized) {
                        init();
                        this.initialized = true;
                    }
                }
            }
            Object[] objArr3 = new Object[this.measureNum];
            this.aggregators.aggregate(objArr, objArr2, objArr3, this.needAggr);
            return objArr3;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.3.2.jar:org/apache/kylin/engine/spark/SparkCubingByLayer$EncodeBaseCuboid.class */
    public static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> {
        private volatile transient boolean initialized = false;
        private BaseCuboidBuilder baseCuboidBuilder = null;
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;

        public EncodeBaseCuboid(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
            if (!this.initialized) {
                synchronized (SparkCubingByLayer.class) {
                    if (!this.initialized) {
                        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
                        CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                        CubeDesc descriptor = cube.getDescriptor();
                        CubeSegment segmentById = cube.getSegmentById(this.segmentId);
                        this.baseCuboidBuilder = new BaseCuboidBuilder(loadKylinConfigFromHdfs, descriptor, segmentById, new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(segmentById), descriptor), AbstractRowKeyEncoder.createInstance(segmentById, Cuboid.findForMandatory(descriptor, Cuboid.getBaseCuboidId(descriptor))), MeasureIngester.create(descriptor.getMeasures()), segmentById.buildDictionaryMap());
                        this.initialized = true;
                    }
                }
            }
            String[] rowToArray = rowToArray(row);
            this.baseCuboidBuilder.resetAggrs();
            byte[] buildKey = this.baseCuboidBuilder.buildKey(rowToArray);
            return new Tuple2<>(new ByteArray(buildKey), this.baseCuboidBuilder.buildValueObjects(rowToArray));
        }

        private String[] rowToArray(Row row) {
            String[] strArr = new String[row.size()];
            for (int i = 0; i < row.size(); i++) {
                Object obj = row.get(i);
                if (obj != null) {
                    strArr[i] = obj.toString();
                } else {
                    strArr[i] = null;
                }
            }
            return strArr;
        }
    }

    public SparkCubingByLayer() {
        this.options.addOption(OPTION_INPUT_TABLE);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        Class[] clsArr = {Text.class, Class.forName("scala.reflect.ClassTag$$anon$1"), Class.class};
        SparkConf appName = new SparkConf().setAppName("Cubing for:" + optionValue3 + " segment " + optionValue4);
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
        appName.set("spark.kryo.registrationRequired", "true").registerKryoClasses(clsArr);
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        HadoopUtil.deletePath(javaSparkContext.hadoopConfiguration(), new Path(optionValue5));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue);
        CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue3);
        CubeDesc descriptor = cube.getDescriptor();
        CubeSegment segmentById = cube.getSegmentById(optionValue4);
        Configuration configuration = new Configuration(javaSparkContext.hadoopConfiguration());
        configuration.set("dfs.replication", "2");
        Job job = Job.getInstance(configuration);
        logger.info("RDD Output path: {}", optionValue5);
        setHadoopConf(job, segmentById, optionValue);
        int i = 0;
        Iterator<MeasureDesc> it = descriptor.getMeasures().iterator();
        while (it.hasNext() && !it.next().getFunction().isCount()) {
            i++;
        }
        CubeStatsReader cubeStatsReader = new CubeStatsReader(segmentById, loadKylinConfigFromHdfs);
        boolean[] zArr = new boolean[descriptor.getMeasures().size()];
        boolean z = true;
        for (int i2 = 0; i2 < descriptor.getMeasures().size(); i2++) {
            zArr[i2] = !descriptor.getMeasures().get(i2).getFunction().getMeasureType().onlyAggrInBaseCuboid();
            z = z && zArr[i2];
        }
        logger.info("All measure are normal (agg on all cuboids) ? : " + z);
        StorageLevel MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK_SER();
        JavaPairRDD mapToPair = new HiveContext(javaSparkContext.sc()).table(optionValue2).javaRDD().mapToPair(new EncodeBaseCuboid(optionValue3, optionValue4, optionValue, serializableConfiguration));
        Long valueOf = loadKylinConfigFromHdfs.isSparkSanityCheckEnabled() ? Long.valueOf(mapToPair.count()) : 0L;
        BaseCuboidReducerFunction2 baseCuboidReducerFunction2 = new BaseCuboidReducerFunction2(optionValue3, optionValue, serializableConfiguration);
        BaseCuboidReducerFunction2 baseCuboidReducerFunction22 = baseCuboidReducerFunction2;
        if (!z) {
            baseCuboidReducerFunction22 = new CuboidReducerFunction2(optionValue3, optionValue, serializableConfiguration, zArr);
        }
        int buildLevel = segmentById.getCuboidScheduler().getBuildLevel();
        JavaPairRDD<ByteArray, Object[]>[] javaPairRDDArr = new JavaPairRDD[buildLevel + 1];
        javaPairRDDArr[0] = mapToPair.reduceByKey(baseCuboidReducerFunction2, estimateRDDPartitionNum(0, cubeStatsReader, loadKylinConfigFromHdfs)).persist(MEMORY_AND_DISK_SER);
        saveToHDFS(javaPairRDDArr[0], optionValue, optionValue3, segmentById, optionValue5, 0, job, loadKylinConfigFromHdfs);
        for (int i3 = 1; i3 <= buildLevel; i3++) {
            javaPairRDDArr[i3] = javaPairRDDArr[i3 - 1].flatMapToPair(new CuboidFlatMap(optionValue3, optionValue4, optionValue, serializableConfiguration)).reduceByKey(baseCuboidReducerFunction22, estimateRDDPartitionNum(i3, cubeStatsReader, loadKylinConfigFromHdfs)).persist(MEMORY_AND_DISK_SER);
            if (loadKylinConfigFromHdfs.isSparkSanityCheckEnabled()) {
                sanityCheck(javaPairRDDArr[i3], valueOf, i3, cubeStatsReader, i);
            }
            saveToHDFS(javaPairRDDArr[i3], optionValue, optionValue3, segmentById, optionValue5, i3, job, loadKylinConfigFromHdfs);
            javaPairRDDArr[i3 - 1].unpersist();
        }
        javaPairRDDArr[buildLevel].unpersist();
        logger.info("Finished on calculating all level cuboids.");
        deleteHDFSMeta(optionValue);
    }

    protected void setHadoopConf(Job job, CubeSegment cubeSegment, String str) throws Exception {
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    }

    protected int estimateRDDPartitionNum(int i, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
        int min = Math.min(kylinConfig.getSparkMaxPartition(), Math.max(kylinConfig.getSparkMinPartition(), (int) (cubeStatsReader.estimateLayerSize(i) / kylinConfig.getSparkRDDPartitionCutMB())));
        logger.info("Partition for spark cubing: {}", Integer.valueOf(min));
        return min;
    }

    protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> javaPairRDD, KylinConfig kylinConfig, CubeSegment cubeSegment, int i) {
        return javaPairRDD;
    }

    protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> javaPairRDD, final String str, final String str2, CubeSegment cubeSegment, String str3, int i, Job job, KylinConfig kylinConfig) throws Exception {
        String cuboidOutputPathsByLevel = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(str3, i);
        final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        MRUtil.getBatchCubingOutputSide2(cubeSegment).getOuputFormat().configureJobOutput(job, cuboidOutputPathsByLevel, cubeSegment, cubeSegment.getCuboidScheduler(), i);
        prepareOutput(javaPairRDD, kylinConfig, cubeSegment, i).mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, Text, Text>() { // from class: org.apache.kylin.engine.spark.SparkCubingByLayer.1
            private volatile transient boolean initialized = false;
            BufferedMeasureCodec codec;

            public Tuple2<Text, Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
                if (!this.initialized) {
                    synchronized (SparkCubingByLayer.class) {
                        if (!this.initialized) {
                            this.codec = new BufferedMeasureCodec(CubeDescManager.getInstance(AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, str)).getCubeDesc(str2).getMeasures());
                            this.initialized = true;
                        }
                    }
                }
                ByteBuffer encode = this.codec.encode((Object[]) tuple2._2());
                byte[] bArr = new byte[encode.position()];
                System.arraycopy(encode.array(), 0, bArr, 0, encode.position());
                return new Tuple2<>(new Text(((ByteArray) tuple2._1()).array()), new Text(bArr));
            }
        }).saveAsNewAPIHadoopDataset(job.getConfiguration());
        logger.info("Persisting RDD for level " + i + " into " + cuboidOutputPathsByLevel);
    }

    protected void sanityCheck(JavaPairRDD<ByteArray, Object[]> javaPairRDD, Long l, int i, CubeStatsReader cubeStatsReader, int i2) {
        int size = cubeStatsReader.getCuboidsByLayer(i).size();
        Long rDDCountSum = getRDDCountSum(javaPairRDD, i2);
        if (rDDCountSum.longValue() != l.longValue() * size) {
            throw new IllegalStateException(String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", Integer.valueOf(i), rDDCountSum, Integer.valueOf(size)));
        }
        logger.info("sanity check success for level " + i + ", count(*) is " + (rDDCountSum.longValue() / size));
    }

    private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> javaPairRDD, final int i) {
        final ByteArray byteArray = new ByteArray();
        return (Long) ((Tuple2) javaPairRDD.mapValues(new Function<Object[], Long>() { // from class: org.apache.kylin.engine.spark.SparkCubingByLayer.3
            public Long call(Object[] objArr) throws Exception {
                return (Long) objArr[i];
            }
        }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { // from class: org.apache.kylin.engine.spark.SparkCubingByLayer.2
            public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> tuple2, Tuple2<ByteArray, Long> tuple22) throws Exception {
                return new Tuple2<>(byteArray, Long.valueOf(((Long) tuple2._2()).longValue() + ((Long) tuple22._2()).longValue()));
            }
        }))._2();
    }

    protected void deleteHDFSMeta(String str) throws IOException {
        String substring = str.substring(0, str.indexOf(64));
        HadoopUtil.getFileSystem(substring).delete(new Path(substring), true);
        logger.info("Delete metadata in HDFS for this job: " + substring);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName("hiveTable");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table");
        OPTION_INPUT_TABLE = OptionBuilder.create("hiveTable");
        EMTPY_ITERATOR = new ArrayList(0);
    }
}
