package org.apache.kylin.engine.spark;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.UnsignedBytes;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.Serializable;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.cube.util.CubingUtils;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
import org.apache.kylin.engine.spark.util.IteratorUtils;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.columnar.CachedBatch;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DateType$;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.reflections.Reflections;
import org.reflections.scanners.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.math.BigDecimal;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubing.class */
public class SparkCubing extends AbstractApplication {
    protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
    private static final Option OPTION_INPUT_PATH;
    private static final Option OPTION_CUBE_NAME;
    private static final Option OPTION_SEGMENT_ID;
    private static final Option OPTION_CONF_PATH;
    private static final Option OPTION_COPROCESSOR;
    private Options options = new Options();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kylin.engine.spark.SparkCubing$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubing$7.class */
    public class AnonymousClass7 implements FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>> {
        final /* synthetic */ String[] val$dataTypes;
        final /* synthetic */ int val$measureSize;
        final /* synthetic */ MeasureAggregators val$aggs;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.kylin.engine.spark.SparkCubing$7$1, reason: invalid class name */
        /* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubing$7$1.class */
        public class AnonymousClass1 implements Iterable<Tuple2<byte[], byte[]>> {
            final ByteBuffer buffer = ByteBuffer.allocate(1048576);
            final MeasureCodec codec;
            final Object[] input;
            final Object[] result;
            final /* synthetic */ Iterator val$tuple2Iterator;

            AnonymousClass1(Iterator it) {
                this.val$tuple2Iterator = it;
                this.codec = new MeasureCodec(AnonymousClass7.this.val$dataTypes);
                this.input = new Object[AnonymousClass7.this.val$measureSize];
                this.result = new Object[AnonymousClass7.this.val$measureSize];
            }

            @Override // java.lang.Iterable
            public Iterator<Tuple2<byte[], byte[]>> iterator() {
                return IteratorUtils.merge(this.val$tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() { // from class: org.apache.kylin.engine.spark.SparkCubing.7.1.1
                    public byte[] call(Iterable<byte[]> iterable) throws Exception {
                        LinkedList newLinkedList = Lists.newLinkedList(iterable);
                        if (newLinkedList.size() == 1) {
                            return (byte[]) newLinkedList.get(0);
                        }
                        AnonymousClass7.this.val$aggs.reset();
                        Iterator it = newLinkedList.iterator();
                        while (it.hasNext()) {
                            AnonymousClass1.this.codec.decode(ByteBuffer.wrap((byte[]) it.next()), AnonymousClass1.this.input);
                            AnonymousClass7.this.val$aggs.aggregate(AnonymousClass1.this.input);
                        }
                        AnonymousClass7.this.val$aggs.collectStates(AnonymousClass1.this.result);
                        AnonymousClass1.this.buffer.clear();
                        AnonymousClass1.this.codec.encode(AnonymousClass1.this.result, AnonymousClass1.this.buffer);
                        byte[] bArr = new byte[AnonymousClass1.this.buffer.position()];
                        System.arraycopy(AnonymousClass1.this.buffer.array(), AnonymousClass1.this.buffer.arrayOffset(), bArr, 0, AnonymousClass1.this.buffer.position());
                        return bArr;
                    }
                });
            }
        }

        AnonymousClass7(String[] strArr, int i, MeasureAggregators measureAggregators) {
            this.val$dataTypes = strArr;
            this.val$measureSize = i;
            this.val$aggs = measureAggregators;
        }

        public Iterable<Tuple2<byte[], byte[]>> call(Iterator<Tuple2<byte[], byte[]>> it) throws Exception {
            return new AnonymousClass1(it);
        }
    }

    public SparkCubing() {
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_CONF_PATH);
        this.options.addOption(OPTION_COPROCESSOR);
    }

    protected Options getOptions() {
        return this.options;
    }

    private void setupClasspath(JavaSparkContext javaSparkContext, String str) throws Exception {
        ClassUtil.addClasspath(str);
        for (File file : new File(str).listFiles(new FileFilter() { // from class: org.apache.kylin.engine.spark.SparkCubing.1
            @Override // java.io.FileFilter
            public boolean accept(File file2) {
                return file2.getAbsolutePath().endsWith(".xml") || file2.getAbsolutePath().endsWith(".properties");
            }
        })) {
            javaSparkContext.addFile(file.getAbsolutePath());
        }
    }

    private void writeDictionary(DataFrame dataFrame, String str, String str2) throws Exception {
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        CubeInstance reloadCubeLocal = cubeManager.reloadCubeLocal(str);
        String[] columns = dataFrame.columns();
        CubeDesc descriptor = reloadCubeLocal.getDescriptor();
        HashMap newHashMap = Maps.newHashMap();
        CubeJoinedFlatTableDesc cubeJoinedFlatTableDesc = new CubeJoinedFlatTableDesc(descriptor, (CubeSegment) null);
        List columns2 = Cuboid.findById(descriptor, Cuboid.getBaseCuboidId(descriptor)).getColumns();
        long currentTimeMillis = System.currentTimeMillis();
        RowKeyDesc rowkey = descriptor.getRowkey();
        for (int i = 0; i < columns2.size(); i++) {
            TblColRef tblColRef = (TblColRef) columns2.get(i);
            if (rowkey.isUseDictionary(tblColRef)) {
                newHashMap.put(Integer.valueOf(cubeJoinedFlatTableDesc.getRowKeyColumnIndexes()[i]), tblColRef);
            }
        }
        HashMap newHashMap2 = Maps.newHashMap();
        for (Map.Entry entry : newHashMap.entrySet()) {
            String str3 = columns[((Integer) entry.getKey()).intValue()];
            TblColRef tblColRef2 = (TblColRef) entry.getValue();
            final Row[] collect = dataFrame.select(str3, new String[0]).distinct().collect();
            newHashMap2.put(tblColRef2, DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef2.getType(), new IterableDictionaryValueEnumerator(new Iterable<byte[]>() { // from class: org.apache.kylin.engine.spark.SparkCubing.2
                @Override // java.lang.Iterable
                public Iterator<byte[]> iterator() {
                    return new Iterator<byte[]>() { // from class: org.apache.kylin.engine.spark.SparkCubing.2.1
                        int i = 0;

                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return this.i < collect.length;
                        }

                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.Iterator
                        public byte[] next() {
                            if (!hasNext()) {
                                throw new NoSuchElementException();
                            }
                            Row[] rowArr = collect;
                            int i2 = this.i;
                            this.i = i2 + 1;
                            Object obj = rowArr[i2].get(0);
                            if (obj != null) {
                                return obj.toString().getBytes();
                            }
                            return null;
                        }

                        @Override // java.util.Iterator
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }
            })));
        }
        CubingUtils.writeDictionary(reloadCubeLocal.getSegmentById(str2), newHashMap2, currentTimeMillis, System.currentTimeMillis());
        try {
            CubeUpdate cubeUpdate = new CubeUpdate(reloadCubeLocal);
            cubeUpdate.setToUpdateSegs(new CubeSegment[]{reloadCubeLocal.getSegmentById(str2)});
            cubeManager.updateCube(cubeUpdate);
        } catch (IOException e) {
            throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
    }

    private Map<Long, HyperLogLogPlusCounter> sampling(JavaRDD<List<String>> javaRDD, String str) throws Exception {
        CubeDesc descriptor = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(str).getDescriptor();
        List<Long> allCuboidIds = new CuboidScheduler(descriptor).getAllCuboidIds();
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = allCuboidIds.iterator();
        while (it.hasNext()) {
            newHashMap.put((Long) it.next(), new HyperLogLogPlusCounter(descriptor.getConfig().getCubeStatsHLLPrecision()));
        }
        final int[] rowKeyColumnIndexes = new CubeJoinedFlatTableDesc(descriptor, (CubeSegment) null).getRowKeyColumnIndexes();
        final int length = descriptor.getRowkey().getRowKeyColumns().length;
        long baseCuboidId = Cuboid.getBaseCuboidId(descriptor);
        final HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
        final ByteArray[] byteArrayArr = new ByteArray[length];
        for (Long l : allCuboidIds) {
            Integer[] numArr = new Integer[Long.bitCount(l.longValue())];
            long highestOneBit = Long.highestOneBit(baseCuboidId);
            int i = 0;
            for (int i2 = 0; i2 < length; i2++) {
                if ((highestOneBit & l.longValue()) > 0) {
                    numArr[i] = Integer.valueOf(i2);
                    i++;
                }
                highestOneBit >>= 1;
            }
            newHashMapWithExpectedSize.put(l, numArr);
        }
        for (int i3 = 0; i3 < length; i3++) {
            byteArrayArr[i3] = new ByteArray();
        }
        return (HashMap) javaRDD.aggregate(newHashMap, new Function2<HashMap<Long, HyperLogLogPlusCounter>, List<String>, HashMap<Long, HyperLogLogPlusCounter>>() { // from class: org.apache.kylin.engine.spark.SparkCubing.3
            final HashFunction hashFunction = Hashing.murmur3_128();

            public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> hashMap, List<String> list) throws Exception {
                for (int i4 = 0; i4 < length; i4++) {
                    Hasher newHasher = this.hashFunction.newHasher();
                    String str2 = list.get(rowKeyColumnIndexes[i4]);
                    if (str2 != null) {
                        byteArrayArr[i4].set(newHasher.putString(str2).hash().asBytes());
                    } else {
                        byteArrayArr[i4].set(newHasher.putInt(0).hash().asBytes());
                    }
                }
                for (Map.Entry entry : newHashMapWithExpectedSize.entrySet()) {
                    Hasher newHasher2 = this.hashFunction.newHasher();
                    HyperLogLogPlusCounter hyperLogLogPlusCounter = hashMap.get(entry.getKey());
                    for (Integer num : (Integer[]) entry.getValue()) {
                        newHasher2.putBytes(byteArrayArr[num.intValue()].array());
                    }
                    hyperLogLogPlusCounter.add(newHasher2.hash().asBytes());
                }
                return hashMap;
            }
        }, new Function2<HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>>() { // from class: org.apache.kylin.engine.spark.SparkCubing.4
            public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> hashMap, HashMap<Long, HyperLogLogPlusCounter> hashMap2) throws Exception {
                Preconditions.checkArgument(hashMap.size() == hashMap2.size());
                Preconditions.checkArgument(hashMap.size() > 0);
                for (Map.Entry<Long, HyperLogLogPlusCounter> entry : hashMap.entrySet()) {
                    entry.getValue().merge((HyperLogLogPlusCounter) Preconditions.checkNotNull(hashMap2.get(entry.getKey()), "counter cannot be null"));
                }
                return hashMap;
            }
        });
    }

    private String build(JavaRDD<List<String>> javaRDD, final String str, final String str2, byte[][] bArr) throws Exception {
        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
        CubeDesc descriptor = cube.getDescriptor();
        CubeSegment segmentById = cube.getSegmentById(str2);
        List<TblColRef> columns = Cuboid.findById(descriptor, Cuboid.getBaseCuboidId(descriptor)).getColumns();
        final HashMap newHashMap = Maps.newHashMap();
        CubeDimEncMap dimensionEncodingMap = segmentById.getDimensionEncodingMap();
        for (TblColRef tblColRef : columns) {
            newHashMap.put(tblColRef, Integer.valueOf(dimensionEncodingMap.get(tblColRef).getLengthOfEncoding()));
        }
        final HashMap newHashMap2 = Maps.newHashMap();
        Iterator it = descriptor.getDimensions().iterator();
        while (it.hasNext()) {
            for (TblColRef tblColRef2 : ((DimensionDesc) it.next()).getColumnRefs()) {
                if (descriptor.getRowkey().isUseDictionary(tblColRef2)) {
                    Dictionary dictionary = segmentById.getDictionary(tblColRef2);
                    if (dictionary == null) {
                        System.err.println("Dictionary for " + tblColRef2 + " was not found.");
                    }
                    newHashMap2.put(tblColRef2, dictionary);
                    System.out.println("col:" + tblColRef2 + " dictionary size:" + dictionary.getSize());
                }
            }
        }
        Iterator it2 = descriptor.getMeasures().iterator();
        while (it2.hasNext()) {
            FunctionDesc function = ((MeasureDesc) it2.next()).getFunction();
            for (TblColRef tblColRef3 : function.getMeasureType().getColumnsNeedDictionary(function)) {
                newHashMap2.put(tblColRef3, segmentById.getDictionary(tblColRef3));
            }
        }
        JavaPairRDD<byte[], byte[]> mapPartitionsToPair = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { // from class: org.apache.kylin.engine.spark.SparkCubing.5
            public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> it3) throws Exception {
                long currentTimeMillis = System.currentTimeMillis();
                SparkCubing.prepare();
                CubeInstance cube2 = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                System.out.println("load properties finished");
                DoggedCubeBuilder doggedCubeBuilder = new DoggedCubeBuilder(cube2.getDescriptor(), newHashMap2);
                BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cube2.getSegmentById(str2), newHashMap));
                Executors.newCachedThreadPool().submit(doggedCubeBuilder.buildAsRunnable(linkedBlockingQueue, bufferedCuboidWriter));
                while (it3.hasNext()) {
                    try {
                        Iterator<List<String>> it4 = it3.next().iterator();
                        while (it4.hasNext()) {
                            linkedBlockingQueue.put(it4.next());
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                linkedBlockingQueue.put(Collections.emptyList());
                System.out.println("build partition cost: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                return bufferedCuboidWriter.getResult();
            }
        });
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        Configuration configurationForHFile = getConfigurationForHFile(segmentById.getStorageLocationIdentifier());
        Path path = new Path(instanceFromEnv.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString());
        Preconditions.checkArgument(!FileSystem.get(configurationForHFile).exists(path));
        String str3 = configurationForHFile.get("fs.defaultFS") + path.toString();
        System.out.println("use " + str3 + " as hfile");
        List measures = descriptor.getMeasures();
        int size = measures.size();
        String[] strArr = new String[size];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = ((MeasureDesc) measures.get(i)).getFunction().getReturnType();
        }
        writeToHFile2(mapPartitionsToPair, strArr, size, new MeasureAggregators(measures), bArr, configurationForHFile, str3);
        return str3;
    }

    private void writeToHFile2(JavaPairRDD<byte[], byte[]> javaPairRDD, String[] strArr, int i, MeasureAggregators measureAggregators, final byte[][] bArr, Configuration configuration, String str) {
        javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { // from class: org.apache.kylin.engine.spark.SparkCubing.8
            public int numPartitions() {
                return bArr.length + 1;
            }

            public int getPartition(Object obj) {
                Preconditions.checkArgument(obj instanceof byte[]);
                int length = bArr.length;
                for (int i2 = 0; i2 < length; i2++) {
                    if (UnsignedBytes.lexicographicalComparator().compare((byte[]) obj, bArr[i2]) < 0) {
                        return i2;
                    }
                }
                return bArr.length;
            }
        }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new AnonymousClass7(strArr, i, measureAggregators), true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { // from class: org.apache.kylin.engine.spark.SparkCubing.6
            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception {
                return new Tuple2<>(new ImmutableBytesWritable((byte[]) tuple2._1()), new KeyValue((byte[]) tuple2._1(), "F1".getBytes(), "M".getBytes(), (byte[]) tuple2._2()));
            }
        }).saveAsNewAPIHadoopFile(str, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, configuration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void prepare() throws Exception {
        String absolutePath = new File(SparkFiles.get("kylin.properties")).getParentFile().getAbsolutePath();
        System.out.println("conf directory:" + absolutePath);
        System.setProperty("KYLIN_CONF", absolutePath);
        ClassUtil.addClasspath(absolutePath);
    }

    private byte[][] createHTable(String str, String str2, Map<Long, HyperLogLogPlusCounter> map) throws Exception {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeSegment segmentById = CubeManager.getInstance(instanceFromEnv).getCube(str).getSegmentById(str2);
        Map cuboidSizeMapFromRowCount = CubeStatsReader.getCuboidSizeMapFromRowCount(segmentById, CubeStatsReader.getCuboidRowCountMapFromSampling(map, 100));
        System.out.println("cube size estimation:" + cuboidSizeMapFromRowCount);
        byte[][] regionSplitsFromCuboidStatistics = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cuboidSizeMapFromRowCount, instanceFromEnv, segmentById, (Path) null);
        CubeHTableUtil.createHTable(segmentById, regionSplitsFromCuboidStatistics);
        System.out.println(segmentById.getStorageLocationIdentifier() + " table created");
        return regionSplitsFromCuboidStatistics;
    }

    private Configuration getConfigurationForHFile(String str) throws IOException {
        Configuration currentHBaseConfiguration = HBaseConnection.getCurrentHBaseConfiguration();
        Job job = Job.getInstance(currentHBaseConfiguration);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(currentHBaseConfiguration, str));
        return currentHBaseConfiguration;
    }

    private void bulkLoadHFile(String str, String str2, String str3) throws Exception {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeInstance cube = CubeManager.getInstance(instanceFromEnv).getCube(str);
        CubeSegment segmentById = cube.getSegmentById(str2);
        Configuration currentHBaseConfiguration = HBaseConnection.getCurrentHBaseConfiguration();
        try {
            new FsShell(currentHBaseConfiguration).run(new String[]{"-chmod", "-R", "777", str3});
            System.out.println("incremental load result:" + ToolRunner.run(new LoadIncrementalHFiles(currentHBaseConfiguration), new String[]{str3, segmentById.getStorageLocationIdentifier()}));
            segmentById.setStatus(SegmentStatusEnum.READY);
            try {
                CubeUpdate cubeUpdate = new CubeUpdate(cube);
                cube.setStatus(RealizationStatusEnum.READY);
                segmentById.setStatus(SegmentStatusEnum.READY);
                cubeUpdate.setToUpdateSegs(new CubeSegment[]{segmentById});
                CubeManager.getInstance(instanceFromEnv).updateCube(cubeUpdate);
            } catch (IOException e) {
                throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
            }
        } catch (Exception e2) {
            logger.error("Couldnt change the file permissions ", e2);
            throw new IOException(e2);
        }
    }

    private Collection<String> getKyroClasses() {
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.addAll(new Reflections("org.apache.kylin", new Scanner[0]).getSubTypesOf(Serializable.class));
        newHashSet.addAll(new Reflections("org.apache.kylin.cube.model", new Scanner[0]).getSubTypesOf(Object.class));
        newHashSet.addAll(new Reflections("org.apache.kylin.metadata.model", new Scanner[0]).getSubTypesOf(Object.class));
        newHashSet.addAll(new Reflections("org.apache.kylin.metadata.measure", new Scanner[0]).getSubTypesOf(Object.class));
        newHashSet.add(HashMap.class);
        newHashSet.add(Row[].class);
        newHashSet.add(Row.class);
        newHashSet.add(GenericRowWithSchema.class);
        newHashSet.add(StructType.class);
        newHashSet.add(StructField[].class);
        newHashSet.add(StructField.class);
        newHashSet.add(DateType$.class);
        newHashSet.add(Metadata.class);
        newHashSet.add(Object[].class);
        newHashSet.add(StringType$.class);
        newHashSet.add(Hashing.murmur3_128().getClass());
        newHashSet.add(CachedBatch.class);
        newHashSet.add(byte[][].class);
        newHashSet.add(Decimal.class);
        newHashSet.add(BigDecimal.class);
        newHashSet.add(java.math.BigDecimal.class);
        newHashSet.add(MathContext.class);
        newHashSet.add(RoundingMode.class);
        newHashSet.add(ArrayList.class);
        newHashSet.add(LinkedList.class);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = newHashSet.iterator();
        while (it.hasNext()) {
            newArrayList.add(((Class) it.next()).getName());
        }
        newArrayList.add("scala.collection.immutable.Map$EmptyMap$");
        newArrayList.add("org.apache.spark.sql.catalyst.expressions.GenericInternalRow");
        newArrayList.add("org.apache.spark.unsafe.types.UTF8String");
        return newArrayList;
    }

    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        SparkConf appName = new SparkConf().setAppName("Simple Application");
        appName.set("spark.executor.memory", "6g");
        appName.set("spark.storage.memoryFraction", "0.3");
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrationRequired", "true");
        Iterable filter = Iterables.filter(Iterables.concat(Lists.newArrayList(appName.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { // from class: org.apache.kylin.engine.spark.SparkCubing.9
            public boolean apply(@Nullable String str) {
                return str != null && str.trim().length() > 0;
            }
        });
        System.out.println("kyro classes:" + filter.toString());
        appName.set("spark.kryo.classesToRegister", StringUtils.join(filter, ","));
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        DataFrame sql = new HiveContext(javaSparkContext.sc()).sql("select * from " + optionValue);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_CONF_PATH);
        KylinConfig.getInstanceFromEnv().overrideCoprocessorLocalJar(optionsHelper.getOptionValue(OPTION_COPROCESSOR));
        setupClasspath(javaSparkContext, optionValue4);
        sql.cache();
        writeDictionary(sql, optionValue2, optionValue3);
        JavaRDD<List<String>> map = sql.javaRDD().map(new Function<Row, List<String>>() { // from class: org.apache.kylin.engine.spark.SparkCubing.10
            public List<String> call(Row row) throws Exception {
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(row.size());
                for (int i = 0; i < row.size(); i++) {
                    Object obj = row.get(i);
                    if (obj != null) {
                        newArrayListWithExpectedSize.add(obj.toString());
                    } else {
                        newArrayListWithExpectedSize.add(null);
                    }
                }
                return newArrayListWithExpectedSize;
            }
        });
        bulkLoadHFile(optionValue2, optionValue3, build(map, optionValue2, optionValue3, createHTable(optionValue2, optionValue3, sampling(map, optionValue2))));
    }

    static {
        OptionBuilder.withArgName("path");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table");
        OPTION_INPUT_PATH = OptionBuilder.create("hiveTable");
        OptionBuilder.withArgName("cubename");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create("cubename");
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName("confPath");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Configuration Path");
        OPTION_CONF_PATH = OptionBuilder.create("confPath");
        OptionBuilder.withArgName("coprocessor");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Coprocessor Jar Path");
        OPTION_COPROCESSOR = OptionBuilder.create("coprocessor");
    }
}
