/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.steps;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
import org.apache.kylin.storage.hbase.steps.RowKeyWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateHTableJob
extends AbstractHadoopJob {
    protected static final Logger logger = LoggerFactory.getLogger(CreateHTableJob.class);
    CubeInstance cube = null;
    CubeDesc cubeDesc = null;
    String segmentID = null;
    String cuboidModeName = null;
    String hbaseConfPath = null;
    KylinConfig kylinConfig;
    Path partitionFilePath;

    public int run(String[] args) throws Exception {
        Options options = new Options();
        options.addOption(OPTION_CUBE_NAME);
        options.addOption(OPTION_SEGMENT_ID);
        options.addOption(OPTION_PARTITION_FILE_PATH);
        options.addOption(OPTION_CUBOID_MODE);
        options.addOption(OPTION_HBASE_CONF_PATH);
        this.parseOptions(options, args);
        this.partitionFilePath = new Path(this.getOptionValue(OPTION_PARTITION_FILE_PATH));
        String cubeName = this.getOptionValue(OPTION_CUBE_NAME).toUpperCase();
        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        this.cube = cubeMgr.getCube(cubeName);
        this.cubeDesc = this.cube.getDescriptor();
        this.kylinConfig = this.cube.getConfig();
        this.segmentID = this.getOptionValue(OPTION_SEGMENT_ID);
        this.cuboidModeName = this.getOptionValue(OPTION_CUBOID_MODE);
        this.hbaseConfPath = this.getOptionValue(OPTION_HBASE_CONF_PATH);
        CubeSegment cubeSegment = this.cube.getSegmentById(this.segmentID);
        Map cuboidSizeMap = new CubeStatsReader(cubeSegment, this.kylinConfig).getCuboidSizeMap();
        Set<Long> buildingCuboids = this.cube.getCuboidsByMode(this.cuboidModeName);
        if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
            HashMap optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize((int)buildingCuboids.size());
            for (Long cuboid : buildingCuboids) {
                Double cuboidSize = (Double)cuboidSizeMap.get(cuboid);
                if (cuboidSize == null) {
                    logger.warn(cuboid + "cuboid's size is null will replace by 0");
                    cuboidSize = 0.0;
                }
                optimizedCuboidSizeMap.put(cuboid, cuboidSize);
            }
            cuboidSizeMap = optimizedCuboidSizeMap;
        }
        byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cuboidSizeMap, this.kylinConfig, cubeSegment, this.partitionFilePath.getParent());
        CubeHTableUtil.createHTable(cubeSegment, splitKeys);
        this.exportHBaseConfiguration(cubeSegment.getStorageLocationIdentifier());
        return 0;
    }

    private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
        HadoopUtil.healSickConfig(hbaseConf);
        Job job = Job.getInstance((Configuration)hbaseConf, (String)hbaseTableName);
        HTable table = new HTable(hbaseConf, hbaseTableName);
        HFileOutputFormat2.configureIncrementalLoadMap((Job)job, (Table)table);
        logger.info("Saving HBase configuration to " + this.hbaseConfPath);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        FSDataOutputStream out = null;
        try {
            out = fs.create(new Path(this.hbaseConfPath));
            job.getConfiguration().writeXml((OutputStream)out);
        }
        catch (IOException e) {
            try {
                throw new ExecuteException("Write hbase configuration failed", (Throwable)e);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(out);
                throw throwable;
            }
        }
        IOUtils.closeQuietly((OutputStream)out);
    }

    private static byte[][] getSplitsByRegionCount(int regionCount) {
        byte[][] result = new byte[regionCount - 1][];
        for (int i = 1; i < regionCount; ++i) {
            byte[] split = new byte[2];
            BytesUtil.writeUnsigned(i, split, 0, 2);
            result[i - 1] = split;
        }
        return result;
    }

    public static byte[][] getRegionSplitsFromCuboidStatistics(Map<Long, Double> cubeSizeMap, KylinConfig kylinConfig, CubeSegment cubeSegment, Path hfileSplitsOutputFolder) throws IOException {
        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
        float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
        logger.info("Cut for HBase region is " + cut + "GB");
        double totalSizeInM = 0.0;
        for (Double cuboidSize : cubeSizeMap.values()) {
            totalSizeInM += cuboidSize.doubleValue();
        }
        ArrayList allCuboids = Lists.newArrayList();
        allCuboids.addAll(cubeSizeMap.keySet());
        Collections.sort(allCuboids);
        int nRegion = Math.round((float)(totalSizeInM / (double)(cut * 1024.0f)));
        nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
        nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
        if (cubeSegment.isEnableSharding()) {
            int original = nRegion;
            if (nRegion == 0) {
                nRegion = 1;
            }
            if (nRegion > Short.MAX_VALUE) {
                logger.info("Too many regions! reduce to 32767");
                nRegion = Short.MAX_VALUE;
            }
            if (nRegion != original) {
                logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
            }
        }
        int mbPerRegion = (int)(totalSizeInM / (double)nRegion);
        mbPerRegion = Math.max(1, mbPerRegion);
        logger.info("Total size " + totalSizeInM + "M (estimated)");
        logger.info("Expecting " + nRegion + " regions.");
        logger.info("Expecting " + mbPerRegion + " MB per region.");
        if (cubeSegment.isEnableSharding()) {
            HashMap cuboidShards = Maps.newHashMap();
            ArrayList innerRegionSplits = Lists.newArrayList();
            for (int i = 0; i < nRegion; ++i) {
                innerRegionSplits.add(new HashMap());
            }
            double[] regionSizes = new double[nRegion];
            Iterator iterator = allCuboids.iterator();
            while (iterator.hasNext()) {
                short startShard;
                double magic;
                long cuboidId = (Long)iterator.next();
                double estimatedSize = cubeSizeMap.get(cuboidId);
                int shardNum = (int)(estimatedSize * (magic = 23.0) / (double)mbPerRegion + 1.0);
                if (shardNum < 1) {
                    shardNum = 1;
                }
                if (shardNum > nRegion) {
                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion));
                    shardNum = nRegion;
                } else {
                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum));
                }
                cuboidShards.put(cuboidId, (short)shardNum);
                for (short i = startShard = ShardingHash.getShard(cuboidId, nRegion); i < startShard + shardNum; i = (short)(i + 1)) {
                    short j = (short)(i % nRegion);
                    regionSizes[j] = regionSizes[j] + estimatedSize / (double)shardNum;
                    ((HashMap)innerRegionSplits.get(j)).put(cuboidId, estimatedSize / (double)shardNum);
                }
            }
            for (int i = 0; i < nRegion; ++i) {
                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
            }
            CuboidShardUtil.saveCuboidShards((CubeSegment)cubeSegment, (Map)cuboidShards, (int)nRegion);
            CreateHTableJob.saveHFileSplits(innerRegionSplits, mbPerRegion, hfileSplitsOutputFolder, kylinConfig);
            return CreateHTableJob.getSplitsByRegionCount(nRegion);
        }
        ArrayList regionSplit = Lists.newArrayList();
        long size = 0L;
        int regionIndex = 0;
        int cuboidCount = 0;
        for (int i = 0; i < allCuboids.size(); ++i) {
            long cuboidId = (Long)allCuboids.get(i);
            if (size >= (long)mbPerRegion || (double)size + cubeSizeMap.get(cuboidId) >= (double)mbPerRegion * 1.2) {
                regionSplit.add(cuboidId);
                logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
                size = 0L;
                cuboidCount = 0;
                ++regionIndex;
            }
            size = (long)((double)size + cubeSizeMap.get(cuboidId));
            ++cuboidCount;
        }
        byte[][] result = new byte[regionSplit.size()][];
        for (int i = 0; i < regionSplit.size(); ++i) {
            result[i] = Bytes.toBytes((Long)regionSplit.get(i));
        }
        return result;
    }

    protected static void saveHFileSplits(List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, Path outputFolder, KylinConfig kylinConfig) throws IOException {
        float hfileSizeGB;
        float hfileSizeMB;
        if (outputFolder == null) {
            logger.warn("outputFolder for hfile split file is null, skip inner region split");
            return;
        }
        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
        FileSystem fs = HadoopUtil.getFileSystem(outputFolder, hbaseConf);
        if (!fs.exists(outputFolder)) {
            fs.mkdirs(outputFolder);
        }
        if ((hfileSizeMB = (hfileSizeGB = kylinConfig.getHBaseHFileSizeGB()) * 1024.0f) > (float)mbPerRegion) {
            hfileSizeMB = mbPerRegion;
        }
        if ((double)hfileSizeMB > 0.0 && kylinConfig.isDevEnv()) {
            hfileSizeMB = mbPerRegion / 2;
        }
        int compactionThreshold = Integer.valueOf(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
        logger.info("hbase.hstore.compactionThreshold is " + compactionThreshold);
        if ((double)hfileSizeMB > 0.0 && hfileSizeMB * (float)compactionThreshold < (float)mbPerRegion) {
            hfileSizeMB = mbPerRegion / compactionThreshold;
        }
        if (hfileSizeMB <= 0.0f) {
            hfileSizeMB = mbPerRegion;
        }
        logger.info("hfileSizeMB:" + hfileSizeMB);
        Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile");
        int regionCount = innerRegionSplits.size();
        ArrayList splits = Lists.newArrayList();
        for (int i = 0; i < regionCount; ++i) {
            if (i > 0) {
                byte[] split = new byte[2];
                BytesUtil.writeUnsigned(i, split, 0, 2);
                splits.add(split);
            }
            HashMap<Long, Double> cuboidSize = innerRegionSplits.get(i);
            ArrayList allCuboids = Lists.newArrayList();
            allCuboids.addAll(cuboidSize.keySet());
            Collections.sort(allCuboids);
            double accumulatedSize = 0.0;
            int j = 0;
            for (Long cuboid : allCuboids) {
                if (accumulatedSize >= (double)hfileSizeMB) {
                    logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
                    byte[] split = new byte[10];
                    BytesUtil.writeUnsigned(i, split, 0, 2);
                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, 2, 8);
                    splits.add(split);
                    accumulatedSize = 0.0;
                    ++j;
                }
                accumulatedSize += cuboidSize.get(cuboid).doubleValue();
            }
        }
        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter((Configuration)hbaseConf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class)});
        for (int i = 0; i < splits.size(); ++i) {
            hfilePartitionWriter.append((Writable)new RowKeyWritable(KeyValue.createFirstOnRow((byte[])((byte[])splits.get(i))).createKeyOnly(false).getKey()), (Writable)NullWritable.get());
        }
        hfilePartitionWriter.close();
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run((Tool)new CreateHTableJob(), (String[])args);
        System.exit(exitCode);
    }
}

