package org.apache.kylin.engine.spark;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.tool.shaded.com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubingMerge.class */
public class SparkCubingMerge extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkCubingMerge.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_PATH;
    private Options options = new Options();
    private String cubeName;
    private String metaUrl;

    /* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubingMerge$ReEncodeCuboidFunction.class */
    static class ReEncodeCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
        private String cubeName;
        private String sourceSegmentId;
        private String mergedSegmentId;
        private String metaUrl;
        private SerializableConfiguration conf;
        private transient KylinConfig kylinConfig;
        private volatile transient boolean initialized = false;
        private transient SegmentReEncoder segmentReEncoder = null;

        ReEncodeCuboidFunction(String str, String str2, String str3, String str4, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.sourceSegmentId = str2;
            this.mergedSegmentId = str3;
            this.metaUrl = str4;
            this.conf = serializableConfiguration;
        }

        private void init() {
            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            CubeInstance cube = CubeManager.getInstance(this.kylinConfig).getCube(this.cubeName);
            this.segmentReEncoder = new SegmentReEncoder(CubeDescManager.getInstance(this.kylinConfig).getCubeDesc(cube.getDescName()), cube.getSegmentById(this.sourceSegmentId), cube.getSegmentById(this.mergedSegmentId), this.kylinConfig);
        }

        public Tuple2<Text, Object[]> call(Tuple2<Text, Text> tuple2) throws Exception {
            if (!this.initialized) {
                synchronized (ReEncodeCuboidFunction.class) {
                    if (!this.initialized) {
                        init();
                        this.initialized = true;
                    }
                }
            }
            Pair<Text, Object[]> reEncode2 = this.segmentReEncoder.reEncode2((Text) tuple2._1, (Text) tuple2._2);
            return new Tuple2<>(reEncode2.getFirst(), reEncode2.getSecond());
        }
    }

    public SparkCubingMerge() {
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_OUTPUT_PATH);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
        this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        Class[] clsArr = {Class.forName("scala.reflect.ClassTag$$anon$1")};
        SparkConf appName = new SparkConf().setAppName("Merge segments for cube:" + this.cubeName + ", segment " + optionValue2);
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
        appName.set("spark.kryo.registrationRequired", "true").registerKryoClasses(clsArr);
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        Throwable th = null;
        try {
            try {
                SparkUtil.modifySparkHadoopConfiguration(javaSparkContext.sc());
                KylinSparkJobListener kylinSparkJobListener = new KylinSparkJobListener();
                javaSparkContext.sc().addSparkListener(kylinSparkJobListener);
                HadoopUtil.deletePath(javaSparkContext.hadoopConfiguration(), new Path(optionValue3));
                final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
                KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, this.metaUrl);
                CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                CubeDesc cubeDesc = CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(cube.getDescName());
                CubeSegment segmentById = cube.getSegmentById(optionValue2);
                CubeStatsReader cubeStatsReader = new CubeStatsReader(segmentById, loadKylinConfigFromHdfs);
                logger.info("Input path: {}", optionValue);
                logger.info("Output path: {}", optionValue3);
                Job job = Job.getInstance(serializableConfiguration.get());
                SparkUtil.setHadoopConfForCuboid(job, segmentById, this.metaUrl);
                final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures());
                Function2<Object[], Object[], Object[]> function2 = new Function2<Object[], Object[], Object[]>() { // from class: org.apache.kylin.engine.spark.SparkCubingMerge.1
                    public Object[] call(Object[] objArr, Object[] objArr2) throws Exception {
                        Object[] objArr3 = new Object[objArr.length];
                        measureAggregators.aggregate(objArr, objArr2, objArr3);
                        return objArr3;
                    }
                };
                PairFunction<Tuple2<Text, Object[]>, Text, Text> pairFunction = new PairFunction<Tuple2<Text, Object[]>, Text, Text>() { // from class: org.apache.kylin.engine.spark.SparkCubingMerge.2
                    private volatile transient boolean initialized = false;
                    BufferedMeasureCodec codec;

                    public Tuple2<Text, Text> call(Tuple2<Text, Object[]> tuple2) throws Exception {
                        if (!this.initialized) {
                            synchronized (SparkCubingMerge.class) {
                                if (!this.initialized) {
                                    synchronized (SparkCubingMerge.class) {
                                        if (!this.initialized) {
                                            KylinConfig loadKylinConfigFromHdfs2 = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, SparkCubingMerge.this.metaUrl);
                                            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs2);
                                            Throwable th2 = null;
                                            try {
                                                try {
                                                    this.codec = new BufferedMeasureCodec(CubeDescManager.getInstance(loadKylinConfigFromHdfs2).getCubeDesc(SparkCubingMerge.this.cubeName).getMeasures());
                                                    this.initialized = true;
                                                    if (andUnsetThreadLocalConfig != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                andUnsetThreadLocalConfig.close();
                                                            } catch (Throwable th3) {
                                                                th2.addSuppressed(th3);
                                                            }
                                                        } else {
                                                            andUnsetThreadLocalConfig.close();
                                                        }
                                                    }
                                                } finally {
                                                }
                                            } catch (Throwable th4) {
                                                if (andUnsetThreadLocalConfig != null) {
                                                    if (th2 != null) {
                                                        try {
                                                            andUnsetThreadLocalConfig.close();
                                                        } catch (Throwable th5) {
                                                            th2.addSuppressed(th5);
                                                        }
                                                    } else {
                                                        andUnsetThreadLocalConfig.close();
                                                    }
                                                }
                                                throw th4;
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        ByteBuffer encode = this.codec.encode((Object[]) tuple2._2());
                        byte[] bArr = new byte[encode.position()];
                        System.arraycopy(encode.array(), 0, bArr, 0, encode.position());
                        return new Tuple2<>(tuple2._1(), new Text(bArr));
                    }
                };
                int buildLevel = segmentById.getCuboidScheduler().getBuildLevel();
                String[] split = StringSplitter.split(optionValue, ",");
                FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
                boolean z = false;
                int length = split.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    if (!workingFileSystem.exists(new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(split[i], 0)))) {
                        z = true;
                        break;
                    }
                    i++;
                }
                if (z) {
                    ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(split.length);
                    for (String str : split) {
                        newArrayListWithExpectedSize.add(SparkUtil.parseInputPath(str, workingFileSystem, javaSparkContext, Text.class, Text.class).mapToPair(new ReEncodeCuboidFunction(this.cubeName, findSourceSegment(str, cube).getUuid(), segmentById.getUuid(), this.metaUrl, serializableConfiguration)));
                    }
                    FileOutputFormat.setOutputPath(job, new Path(optionValue3));
                    javaSparkContext.union((JavaPairRDD[]) newArrayListWithExpectedSize.toArray(new JavaPairRDD[newArrayListWithExpectedSize.size()])).reduceByKey(function2, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, loadKylinConfigFromHdfs)).mapToPair(pairFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
                } else {
                    for (int i2 = 0; i2 <= buildLevel; i2++) {
                        ArrayList newArrayList = Lists.newArrayList();
                        for (String str2 : split) {
                            newArrayList.add(javaSparkContext.sequenceFile(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(str2, i2), Text.class, Text.class).mapToPair(new ReEncodeCuboidFunction(this.cubeName, findSourceSegment(str2, cube).getUuid(), segmentById.getUuid(), this.metaUrl, serializableConfiguration)));
                        }
                        FileOutputFormat.setOutputPath(job, new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(optionValue3, i2)));
                        javaSparkContext.union((JavaPairRDD[]) newArrayList.toArray(new JavaPairRDD[newArrayList.size()])).reduceByKey(function2, SparkUtil.estimateLayerPartitionNum(i2, cubeStatsReader, loadKylinConfigFromHdfs)).mapToPair(pairFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
                    }
                }
                logger.info("HDFS: Number of bytes written={}", Long.valueOf(kylinSparkJobListener.metrics.getBytesWritten()));
                if (javaSparkContext != null) {
                    if (0 == 0) {
                        javaSparkContext.close();
                        return;
                    }
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (javaSparkContext != null) {
                if (th != null) {
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    javaSparkContext.close();
                }
            }
            throw th4;
        }
    }

    private CubeSegment findSourceSegment(String str, CubeInstance cubeInstance) {
        return CubeInstance.findSegmentWithJobId(JobBuilderSupport.extractJobIDFromPath(str), cubeInstance);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HFile output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_INPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cuboid files PATH");
        OPTION_INPUT_PATH = OptionBuilder.create(BatchConstants.ARG_INPUT);
    }
}
