package org.apache.kylin.engine.spark;

import java.io.Serializable;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceUtil;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.SparkFunction;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.class */
public class SparkUpdateShardForOldCuboidDataStep extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkUpdateShardForOldCuboidDataStep.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_META_URL;
    private CubeDesc cubeDesc;
    private RowKeySplitter rowKeySplitter;
    private RowKeyEncoderProvider rowKeyEncoderProvider;
    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
    private Options options = new Options();

    public SparkUpdateShardForOldCuboidDataStep() {
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_META_URL);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        final String optionValue = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        final String optionValue5 = optionsHelper.getOptionValue(OPTION_META_URL);
        String str = optionValue3 + JobBuilderSupport.PathNameCuboidBase;
        String str2 = optionValue3 + JobBuilderSupport.PathNameCuboidOld;
        SparkConf kryoSerializerInConf = SparkUtil.setKryoSerializerInConf();
        kryoSerializerInConf.setAppName("Update_Old_Cuboid_Shard_for_Optimization" + optionValue + "_With_Spark");
        KylinSparkJobListener kylinSparkJobListener = new KylinSparkJobListener();
        JavaSparkContext javaSparkContext = new JavaSparkContext(kryoSerializerInConf);
        Throwable th = null;
        try {
            javaSparkContext.sc().addSparkListener(kylinSparkJobListener);
            final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue5);
            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(optionValue4));
            CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue);
            final CubeSegment segmentById = cube.getSegmentById(optionValue2);
            final CubeSegment originalSegmentToOptimize = cube.getOriginalSegmentToOptimize(segmentById);
            segmentById.setCubeInstance(originalSegmentToOptimize.getCubeInstance());
            JavaPairRDD sequenceFile = javaSparkContext.sequenceFile(str, Text.class, Text.class);
            JavaPairRDD sequenceFile2 = javaSparkContext.sequenceFile(str2, Text.class, Text.class);
            this.cubeDesc = cube.getDescriptor();
            logger.info("start to calculate nBaseReduceTasks");
            Pair<Integer, Integer> convergeCuboidDataReduceTaskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegmentToOptimize);
            int intValue = convergeCuboidDataReduceTaskNums.getFirst().intValue();
            int intValue2 = convergeCuboidDataReduceTaskNums.getSecond().intValue();
            logger.info("nBaseReduceTasks is {}", Integer.valueOf(intValue2));
            Job job = Job.getInstance(serializableConfiguration.get());
            SparkUtil.setHadoopConfForCuboid(job, originalSegmentToOptimize, optionValue5);
            JavaPairRDD mapToPair = sequenceFile.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() { // from class: org.apache.kylin.engine.spark.SparkUpdateShardForOldCuboidDataStep.1
                @Override // org.apache.kylin.engine.spark.SparkFunction.FunctionBase
                protected void doInit() {
                    SparkUpdateShardForOldCuboidDataStep.this.initMethod(serializableConfiguration, optionValue5, optionValue, segmentById, originalSegmentToOptimize);
                }

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.kylin.engine.spark.SparkFunction.PairFunctionBase
                public Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
                    Text text = new Text();
                    long split = SparkUpdateShardForOldCuboidDataStep.this.rowKeySplitter.split(tuple2._1.getBytes());
                    text.set(SparkUpdateShardForOldCuboidDataStep.this.newKeyBuf.array(), 0, SparkUpdateShardForOldCuboidDataStep.this.buildKey(new Cuboid(SparkUpdateShardForOldCuboidDataStep.this.cubeDesc, split, split), SparkUpdateShardForOldCuboidDataStep.this.rowKeySplitter.getSplitBuffers()));
                    return new Tuple2<>(text, tuple2._2);
                }
            });
            SparkUtil.configConvergeCuboidDataReduceOut(job, SparkUtil.generateFilePath(JobBuilderSupport.PathNameCuboidBase, optionValue4));
            mapToPair.repartition(intValue2).saveAsNewAPIHadoopDataset(job.getConfiguration());
            JavaPairRDD mapToPair2 = sequenceFile2.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() { // from class: org.apache.kylin.engine.spark.SparkUpdateShardForOldCuboidDataStep.2
                @Override // org.apache.kylin.engine.spark.SparkFunction.FunctionBase
                protected void doInit() {
                    SparkUpdateShardForOldCuboidDataStep.this.initMethod(serializableConfiguration, optionValue5, optionValue, segmentById, originalSegmentToOptimize);
                }

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.kylin.engine.spark.SparkFunction.PairFunctionBase
                public Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
                    Text text = new Text();
                    long split = SparkUpdateShardForOldCuboidDataStep.this.rowKeySplitter.split(tuple2._1.getBytes());
                    text.set(SparkUpdateShardForOldCuboidDataStep.this.newKeyBuf.array(), 0, SparkUpdateShardForOldCuboidDataStep.this.buildKey(new Cuboid(SparkUpdateShardForOldCuboidDataStep.this.cubeDesc, split, split), SparkUpdateShardForOldCuboidDataStep.this.rowKeySplitter.getSplitBuffers()));
                    return new Tuple2<>(text, tuple2._2);
                }
            });
            SparkUtil.configConvergeCuboidDataReduceOut(job, SparkUtil.generateFilePath(JobBuilderSupport.PathNameCuboidOld, optionValue4));
            mapToPair2.repartition(intValue).saveAsNewAPIHadoopDataset(job.getConfiguration());
            if (javaSparkContext != null) {
                if (0 == 0) {
                    javaSparkContext.close();
                    return;
                }
                try {
                    javaSparkContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (javaSparkContext != null) {
                if (0 != 0) {
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    javaSparkContext.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int buildKey(Cuboid cuboid, ByteArray[] byteArrayArr) {
        RowKeyEncoder rowkeyEncoder = this.rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
        int bodySplitOffset = this.rowKeySplitter.getBodySplitOffset();
        int bitCount = bodySplitOffset + Long.bitCount(cuboid.getId());
        int i = 0;
        for (int i2 = bodySplitOffset; i2 < bitCount; i2++) {
            System.arraycopy(byteArrayArr[i2].array(), byteArrayArr[i2].offset(), this.newKeyBodyBuf, i, byteArrayArr[i2].length());
            i += byteArrayArr[i2].length();
        }
        int bytesLength = rowkeyEncoder.getBytesLength();
        while (this.newKeyBuf.array().length < bytesLength) {
            this.newKeyBuf = new ByteArray(this.newKeyBuf.length() * 2);
        }
        this.newKeyBuf.setLength(bytesLength);
        rowkeyEncoder.encode(new ByteArray(this.newKeyBodyBuf, 0, i), this.newKeyBuf);
        return bytesLength;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initMethod(SerializableConfiguration serializableConfiguration, String str, String str2, CubeSegment cubeSegment, CubeSegment cubeSegment2) {
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, str);
        KylinConfig.setKylinConfigInEnvIfMissing(loadKylinConfigFromHdfs.exportToProperties());
        CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(str2);
        CubeSegment segmentById = cube.getSegmentById(cubeSegment2.getUuid());
        CubeSegment segmentById2 = cube.getSegmentById(cubeSegment.getUuid());
        this.rowKeySplitter = new RowKeySplitter(segmentById);
        this.rowKeyEncoderProvider = new RowKeyEncoderProvider(segmentById2);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("segmentId");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName(BatchConstants.ARG_INPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OPTION_INPUT_PATH = OptionBuilder.create(BatchConstants.ARG_INPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_META_URL);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create(BatchConstants.ARG_META_URL);
    }
}
