package org.apache.kylin.engine.flink;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.percentile.PercentileCounter;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/flink/FlinkCubingMerge.class */
public class FlinkCubingMerge extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) FlinkCubingMerge.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_ENABLE_OBJECT_REUSE;
    private Options options = new Options();
    private String cubeName;
    private String metaUrl;

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkCubingMerge$ConvertTextMapFunction.class */
    private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>> {
        private BufferedMeasureCodec codec;
        private SerializableConfiguration sConf;
        private String metaUrl;
        private String cubeName;

        public ConvertTextMapFunction(SerializableConfiguration serializableConfiguration, String str, String str2) {
            this.sConf = serializableConfiguration;
            this.metaUrl = str;
            this.cubeName = str2;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.sConf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    this.codec = new BufferedMeasureCodec(CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(this.cubeName).getMeasures());
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
            ByteBuffer encode = this.codec.encode((Object[]) tuple2.f1);
            Text text = new Text();
            text.set(encode.array(), 0, encode.position());
            return new Tuple2<>(new Text(((ByteArray) tuple2.f0).array()), text);
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkCubingMerge$MeasureReduceFunction.class */
    private static class MeasureReduceFunction implements ReduceFunction<Tuple2<ByteArray, Object[]>> {
        private MeasureAggregators aggregators;

        public MeasureReduceFunction(MeasureAggregators measureAggregators) {
            this.aggregators = measureAggregators;
        }

        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> tuple2, Tuple2<ByteArray, Object[]> tuple22) throws Exception {
            Object[] objArr = new Object[((Object[]) tuple2.f1).length];
            this.aggregators.aggregate((Object[]) tuple2.f1, (Object[]) tuple22.f1, objArr);
            return new Tuple2<>(tuple2.f0, objArr);
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkCubingMerge$MeasureReduceGroupFunction.class */
    private static class MeasureReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
        private MeasureAggregators aggregators;

        public MeasureReduceGroupFunction(MeasureAggregators measureAggregators) {
            this.aggregators = measureAggregators;
        }

        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            Object[] objArr = null;
            ByteArray byteArray = null;
            for (Tuple2<ByteArray, Object[]> tuple2 : iterable) {
                byteArray = (ByteArray) tuple2.f0;
                if (objArr == null) {
                    objArr = (Object[]) tuple2.f1;
                } else {
                    Object[] objArr2 = new Object[objArr.length];
                    this.aggregators.aggregate((Object[]) tuple2.f1, objArr, objArr2);
                    objArr = objArr2;
                }
            }
            collector.collect(new Tuple2(byteArray, objArr));
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkCubingMerge$ReEncodeCuboidFunction.class */
    private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<ByteArray, Object[]>> {
        private String cubeName;
        private String sourceSegmentId;
        private String mergedSegmentId;
        private String metaUrl;
        private SerializableConfiguration conf;
        private transient KylinConfig kylinConfig;
        private transient SegmentReEncoder segmentReEncoder = null;

        ReEncodeCuboidFunction(String str, String str2, String str3, String str4, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.sourceSegmentId = str2;
            this.mergedSegmentId = str3;
            this.metaUrl = str4;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            CubeInstance cube = CubeManager.getInstance(this.kylinConfig).getCube(this.cubeName);
            this.segmentReEncoder = new SegmentReEncoder(CubeDescManager.getInstance(this.kylinConfig).getCubeDesc(cube.getDescName()), cube.getSegmentById(this.sourceSegmentId), cube.getSegmentById(this.mergedSegmentId), this.kylinConfig);
        }

        public Tuple2<ByteArray, Object[]> map(Tuple2<Text, Text> tuple2) throws Exception {
            Pair<Text, Object[]> reEncode2 = this.segmentReEncoder.reEncode2((Text) tuple2.f0, (Text) tuple2.f1);
            return new Tuple2<>(new ByteArray(reEncode2.getFirst().getBytes()), reEncode2.getSecond());
        }
    }

    public FlinkCubingMerge() {
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_ENABLE_OBJECT_REUSE);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
        this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
        boolean z = false;
        if (optionValue4 != null && !optionValue4.isEmpty()) {
            z = true;
        }
        Job job = Job.getInstance();
        FlinkUtil.modifyFlinkHadoopConfiguration(job);
        HadoopUtil.deletePath(job.getConfiguration(), new Path(optionValue3));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, this.metaUrl);
        CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
        CubeDesc cubeDesc = CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(cube.getDescName());
        CubeSegment segmentById = cube.getSegmentById(optionValue2);
        logger.info("Input path: {}", optionValue);
        logger.info("Output path: {}", optionValue3);
        FlinkUtil.setHadoopConfForCuboid(job, segmentById, this.metaUrl);
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        if (z) {
            executionEnvironment.getConfig().enableObjectReuse();
        }
        executionEnvironment.getConfig().registerKryoType(PercentileCounter.class);
        executionEnvironment.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
        MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures());
        int buildLevel = segmentById.getCuboidScheduler().getBuildLevel();
        String[] split = StringSplitter.split(optionValue, ",");
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        boolean z2 = false;
        int length = split.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (!workingFileSystem.exists(new Path(FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(split[i], 0)))) {
                z2 = true;
                break;
            }
            i++;
        }
        if (z2) {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(split.length);
            for (String str : split) {
                newArrayListWithExpectedSize.add(FlinkUtil.parseInputPath(str, workingFileSystem, executionEnvironment, Text.class, Text.class).map(new ReEncodeCuboidFunction(this.cubeName, findSourceSegment(str, cube).getUuid(), segmentById.getUuid(), this.metaUrl, serializableConfiguration)));
            }
            FileOutputFormat.setOutputPath(job, new Path(optionValue3));
            HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new SequenceFileOutputFormat(), job);
            if (newArrayListWithExpectedSize.size() > 0) {
                UnionOperator unionOperator = (DataSet) newArrayListWithExpectedSize.get(0);
                for (int i2 = 1; i2 < newArrayListWithExpectedSize.size(); i2++) {
                    unionOperator = unionOperator.union((DataSet) newArrayListWithExpectedSize.get(i2));
                }
                unionOperator.groupBy(new int[]{0}).reduceGroup(new MeasureReduceGroupFunction(measureAggregators)).map(new ConvertTextMapFunction(serializableConfiguration, this.metaUrl, this.cubeName)).output(hadoopOutputFormat);
            }
        } else {
            for (int i3 = 0; i3 <= buildLevel; i3++) {
                ArrayList newArrayList = Lists.newArrayList();
                for (String str2 : split) {
                    newArrayList.add(executionEnvironment.createInput(HadoopInputs.readHadoopFile(new SequenceFileInputFormat(), Text.class, Text.class, FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(str2, i3))).map(new ReEncodeCuboidFunction(this.cubeName, findSourceSegment(str2, cube).getUuid(), segmentById.getUuid(), this.metaUrl, serializableConfiguration)));
                }
                String cuboidOutputPathsByLevel = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(optionValue3, i3);
                Job job2 = Job.getInstance();
                FlinkUtil.modifyFlinkHadoopConfiguration(job2);
                FlinkUtil.setHadoopConfForCuboid(job2, segmentById, this.metaUrl);
                FileOutputFormat.setOutputPath(job2, new Path(cuboidOutputPathsByLevel));
                HadoopOutputFormat hadoopOutputFormat2 = new HadoopOutputFormat(new SequenceFileOutputFormat(), job2);
                if (newArrayList.size() > 0) {
                    UnionOperator unionOperator2 = (DataSet) newArrayList.get(0);
                    for (int i4 = 1; i4 < newArrayList.size(); i4++) {
                        unionOperator2 = unionOperator2.union((DataSet) newArrayList.get(i4));
                    }
                    unionOperator2.groupBy(new int[]{0}).reduceGroup(new MeasureReduceGroupFunction(measureAggregators)).map(new ConvertTextMapFunction(serializableConfiguration, this.metaUrl, this.cubeName)).output(hadoopOutputFormat2);
                }
            }
        }
        executionEnvironment.execute("Merge segments for cube:" + this.cubeName + ", segment " + optionValue2);
        logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(optionValue3, workingFileSystem));
    }

    private CubeSegment findSourceSegment(String str, CubeInstance cubeInstance) {
        return CubeInstance.findSegmentWithJobId(JobBuilderSupport.extractJobIDFromPath(str), cubeInstance);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName(BatchConstants.ARG_META_URL);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create(BatchConstants.ARG_META_URL);
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HFile output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_INPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cuboid files PATH");
        OPTION_INPUT_PATH = OptionBuilder.create(BatchConstants.ARG_INPUT);
        OptionBuilder.withArgName("enableObjectReuse");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("Enable object reuse");
        OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.create("enableObjectReuse");
    }
}
