/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.steps;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.flink.FlinkBatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.storage.hbase.steps.HFileOutputFormat3;
import org.apache.kylin.storage.hbase.steps.KeyValueCreator;
import org.apache.kylin.storage.hbase.steps.RowKeyWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkCubeHFile
extends AbstractApplication
implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger(FlinkCubeHFile.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_PARTITION_FILE_PATH;
    public static final Option OPTION_COUNTER_PATH;
    public static final Option OPTION_ENABLE_OBJECT_REUSE;
    private Options options = new Options();

    public FlinkCubeHFile() {
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_PARTITION_FILE_PATH);
        this.options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
        this.options.addOption(OPTION_COUNTER_PATH);
        this.options.addOption(OPTION_ENABLE_OBJECT_REUSE);
    }

    @Override
    protected Options getOptions() {
        return this.options;
    }

    @Override
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        FileSystem fs;
        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
        String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
        boolean enableObjectReuse = false;
        if (!StringUtil.isEmpty(enableObjectReuseOptValue) && enableObjectReuseOptValue.equalsIgnoreCase("true")) {
            enableObjectReuse = true;
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (enableObjectReuse) {
            env.getConfig().enableObjectReuse();
        }
        if (!(fs = HadoopUtil.getWorkingFileSystem()).exists(partitionFilePath)) {
            throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
        }
        Job job = Job.getInstance();
        HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
        SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs((SerializableConfiguration)sConf, (String)metaUrl);
        CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
        final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
        final ArrayList keyValueCreators = Lists.newArrayList();
        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
            }
        }
        final int cfNum = keyValueCreators.size();
        boolean quickPath = keyValueCreators.size() == 1 && ((KeyValueCreator)keyValueCreators.get((int)0)).isFullCopy;
        logger.info("Input path: {}", (Object)inputPath);
        logger.info("Output path: {}", (Object)outputPath);
        ArrayList<RowKeyWritable> keys = new ArrayList<RowKeyWritable>();
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, job.getConfiguration());
        Object object = null;
        try {
            RowKeyWritable key = new RowKeyWritable();
            NullWritable value = NullWritable.get();
            while (reader.next((Writable)key, (Writable)value)) {
                keys.add(key);
                logger.info(" ------- split key: {}", (Object)key);
                key = new RowKeyWritable();
            }
        }
        catch (Throwable key) {
            object = key;
            throw key;
        }
        finally {
            if (reader != null) {
                if (object != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable key) {
                        ((Throwable)object).addSuppressed(key);
                    }
                } else {
                    reader.close();
                }
            }
        }
        logger.info("There are {} split keys, totally {} hfiles", (Object)keys.size(), (Object)(keys.size() + 1));
        logger.info("Loading HBase configuration from:{}", (Object)hbaseConfFile);
        Path hbaseConfFilePath = new Path(hbaseConfFile);
        FileSystem hbaseClusterFs = hbaseConfFilePath.getFileSystem(job.getConfiguration());
        try (FSDataInputStream confInput = hbaseClusterFs.open(new Path(hbaseConfFile));){
            Configuration hbaseJobConf = new Configuration();
            hbaseJobConf.addResource((InputStream)confInput);
            hbaseJobConf.set("dfs.replication", "3");
            hbaseJobConf.setStrings("io.serializations", new String[]{hbaseJobConf.get("io.serializations"), KeyValueSerialization.class.getName()});
            job = Job.getInstance((Configuration)hbaseJobConf, (String)cubeSegment.getStorageLocationIdentifier());
            FileOutputFormat.setOutputPath((Job)job, (Path)new Path(outputPath));
            HadoopOutputFormat hadoopOF = new HadoopOutputFormat((OutputFormat)new HFileOutputFormat3(), job);
            ArrayList mergingLevels = Lists.newArrayList();
            int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
            for (int i = 0; i <= totalLevels; ++i) {
                String cuboidInputPath = JobBuilderSupport.getCuboidOutputPathsByLevel((String)inputPath, (int)i);
                DataSource levelDataSet = env.createInput((InputFormat)HadoopInputs.readHadoopFile((FileInputFormat)new SequenceFileInputFormat(), Text.class, Text.class, (String)cuboidInputPath));
                mergingLevels.add(levelDataSet);
            }
            if (mergingLevels.size() > 0) {
                DataSet inputDataSet = (DataSet)mergingLevels.get(0);
                for (int i = 1; i < mergingLevels.size(); ++i) {
                    inputDataSet = inputDataSet.union((DataSet)mergingLevels.get(i));
                }
                MapPartitionOperator hfileDataSet = quickPath ? inputDataSet.mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple2<Text, Text>, Tuple2<RowKeyWritable, KeyValue>>(){

                    public void mapPartition(Iterable<Tuple2<Text, Text>> values, Collector<Tuple2<RowKeyWritable, KeyValue>> out) throws Exception {
                        for (Tuple2<Text, Text> value : values) {
                            KeyValue outputValue = ((KeyValueCreator)keyValueCreators.get(0)).create((Text)value.f0, ((Text)value.f1).getBytes(), 0, ((Text)value.f1).getLength());
                            out.collect((Object)new Tuple2((Object)new RowKeyWritable(outputValue.getKey()), (Object)outputValue));
                        }
                    }
                }) : inputDataSet.mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple2<Text, Text>, Tuple2<RowKeyWritable, KeyValue>>(){

                    public void mapPartition(Iterable<Tuple2<Text, Text>> values, Collector<Tuple2<RowKeyWritable, KeyValue>> out) throws Exception {
                        for (Tuple2<Text, Text> value : values) {
                            Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
                            inputCodec.decode(ByteBuffer.wrap(((Text)value.f1).getBytes(), 0, ((Text)value.f1).getLength()), inputMeasures);
                            for (int i = 0; i < cfNum; ++i) {
                                KeyValue outputValue = ((KeyValueCreator)keyValueCreators.get(i)).create((Text)value.f0, inputMeasures);
                                out.collect((Object)new Tuple2((Object)new RowKeyWritable(outputValue.getKey()), (Object)outputValue));
                            }
                        }
                    }
                });
                hfileDataSet.partitionCustom((Partitioner)new HFilePartitioner(keys), 0).sortPartition(0, Order.ASCENDING).mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple2<RowKeyWritable, KeyValue>, Tuple2<ImmutableBytesWritable, Cell>>(){

                    public void mapPartition(Iterable<Tuple2<RowKeyWritable, KeyValue>> values, Collector<Tuple2<ImmutableBytesWritable, Cell>> out) throws Exception {
                        for (Tuple2<RowKeyWritable, KeyValue> value : values) {
                            out.collect((Object)new Tuple2((Object)new ImmutableBytesWritable(((KeyValue)value.f1).getKey()), value.f1));
                        }
                    }
                }).output((org.apache.flink.api.common.io.OutputFormat)hadoopOF);
            }
        }
        env.execute(String.format(Locale.ROOT, "Convert cuboid to hfile for cube: %s, segment %s", cubeName, segmentId));
        long size = FlinkBatchCubingJobBuilder2.getFileSize((String)outputPath, (FileSystem)fs);
        logger.info("HDFS: Number of bytes written={}", (Object)size);
        HashMap counterMap = Maps.newHashMap();
        counterMap.put("hdfs_bytes_written", String.valueOf(size));
        HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
    }

    static {
        OptionBuilder.withArgName((String)"cubename");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create((String)"cubename");
        OptionBuilder.withArgName((String)"segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create((String)"segmentId");
        OptionBuilder.withArgName((String)"metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create((String)"metaUrl");
        OptionBuilder.withArgName((String)"output");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"HFile output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create((String)"output");
        OptionBuilder.withArgName((String)"input");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"Cuboid files PATH");
        OPTION_INPUT_PATH = OptionBuilder.create((String)"input");
        OptionBuilder.withArgName((String)"partitions");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"Partition file path.");
        OPTION_PARTITION_FILE_PATH = OptionBuilder.create((String)"partitions");
        OptionBuilder.withArgName((String)"counterOutput");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)true);
        OptionBuilder.withDescription((String)"counter output path");
        OPTION_COUNTER_PATH = OptionBuilder.create((String)"counterOutput");
        OptionBuilder.withArgName((String)"enableObjectReuse");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Enable object reuse");
        OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.create((String)"enableObjectReuse");
    }

    class HFilePartitioner
    implements Partitioner<RowKeyWritable> {
        private List<RowKeyWritable> keys;

        public HFilePartitioner(List splitKeys) {
            this.keys = splitKeys;
        }

        public int partition(RowKeyWritable key, int numPartitions) {
            int pos = Collections.binarySearch(this.keys, key) + 1;
            return pos < 0 ? -pos : pos;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            HFilePartitioner that = (HFilePartitioner)o;
            return Objects.equals(this.keys, that.keys);
        }

        public int hashCode() {
            return Objects.hash(this.keys);
        }
    }
}

