package org.apache.kylin.engine.spark;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.cube.util.KeyValueBuilder;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;
import org.supercsv.cellprocessor.constraint.DMinMax;
import scala.Tuple2;
import scala.Tuple3;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.6.2.jar:org/apache/kylin/engine/spark/SparkFactDistinct.class */
public class SparkFactDistinct extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkFactDistinct.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_STATS_SAMPLING_PERCENT;
    public static final Option OPTION_INPUT_TABLE;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_COUNTER_PATH;
    private Options options = new Options();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.6.2.jar:org/apache/kylin/engine/spark/SparkFactDistinct$CuboidStatCalculator.class */
    public static class CuboidStatCalculator {
        private final int nRowKey;
        private final int[] rowkeyColIndex;
        private final Long[] cuboidIds;
        private final Integer[][] cuboidsBitSet;
        private HLLCounter[] cuboidsHLL;
        private final boolean isNewAlgorithm;
        private final HashFunction hf;
        private long[] rowHashCodesLong;

        public CuboidStatCalculator(int[] iArr, Long[] lArr, Integer[][] numArr, boolean z, HLLCounter[] hLLCounterArr) {
            this.nRowKey = iArr.length;
            this.rowkeyColIndex = iArr;
            this.cuboidIds = lArr;
            this.cuboidsBitSet = numArr;
            this.isNewAlgorithm = z;
            if (this.isNewAlgorithm) {
                this.rowHashCodesLong = new long[this.nRowKey];
                this.hf = Hashing.murmur3_128();
            } else {
                this.hf = Hashing.murmur3_32();
            }
            this.cuboidsHLL = hLLCounterArr;
        }

        public void putRow(String[] strArr) {
            String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length);
            if (this.isNewAlgorithm) {
                putRowKeyToHLLNew(strArr2);
            } else {
                putRowKeyToHLLOld(strArr2);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void putRowKeyToHLLOld(String[] strArr) {
            byte[] bArr = new byte[this.nRowKey];
            for (int i = 0; i < this.nRowKey; i++) {
                Hasher newHasher = this.hf.newHasher();
                String str = strArr[this.rowkeyColIndex[i]];
                if (str != null) {
                    bArr[i] = newHasher.putString((CharSequence) str).hash().asBytes();
                } else {
                    bArr[i] = newHasher.putInt(0).hash().asBytes();
                }
            }
            int length = this.cuboidsBitSet.length;
            for (int i2 = 0; i2 < length; i2++) {
                Hasher newHasher2 = this.hf.newHasher();
                for (int i3 = 0; i3 < this.cuboidsBitSet[i2].length; i3++) {
                    newHasher2.putBytes(bArr[this.cuboidsBitSet[i2][i3].intValue()]);
                }
                this.cuboidsHLL[i2].add(newHasher2.hash().asBytes());
            }
        }

        private void putRowKeyToHLLNew(String[] strArr) {
            for (int i = 0; i < this.nRowKey; i++) {
                Hasher newHasher = this.hf.newHasher();
                String str = strArr[this.rowkeyColIndex[i]];
                if (str == null) {
                    str = CustomBooleanEditor.VALUE_0;
                }
                this.rowHashCodesLong[i] = Bytes.toLong(newHasher.putString((CharSequence) str).hash().asBytes()) + i;
            }
            int length = this.cuboidsBitSet.length;
            for (int i2 = 0; i2 < length; i2++) {
                long j = 0;
                for (int i3 = 0; i3 < this.cuboidsBitSet[i2].length; i3++) {
                    j += this.rowHashCodesLong[this.cuboidsBitSet[i2][i3].intValue()];
                }
                this.cuboidsHLL[i2].addHashDirectly(j);
            }
        }

        public HLLCounter[] getHLLCounters() {
            return this.cuboidsHLL;
        }

        public Long[] getCuboidIds() {
            return this.cuboidIds;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.6.2.jar:org/apache/kylin/engine/spark/SparkFactDistinct$FactDistinctPartitioner.class */
    static class FactDistinctPartitioner extends Partitioner {
        private volatile transient boolean initialized = false;
        private String cubeName;
        private String metaUrl;
        private SerializableConfiguration conf;
        private int totalReducerNum;
        private transient FactDistinctColumnsReducerMapping reducerMapping;

        public FactDistinctPartitioner(String str, String str2, SerializableConfiguration serializableConfiguration, int i) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
            this.totalReducerNum = i;
        }

        private void init() {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    this.reducerMapping = new FactDistinctColumnsReducerMapping(CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName));
                    this.initialized = true;
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public int numPartitions() {
            return this.totalReducerNum;
        }

        public int getPartition(Object obj) {
            if (!this.initialized) {
                synchronized (SparkFactDistinct.class) {
                    if (!this.initialized) {
                        init();
                    }
                }
            }
            Text text = ((SelfDefineSortableKey) obj).getText();
            if (text.getBytes()[0] != -1) {
                return BytesUtil.readUnsigned(text.getBytes(), 0, 1);
            }
            return this.reducerMapping.getReducerIdForCuboidRowCount(Long.valueOf(Bytes.toLong(text.getBytes(), 1, 8)).longValue());
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.6.2.jar:org/apache/kylin/engine/spark/SparkFactDistinct$FlatOutputFucntion.class */
    static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;
        private int samplingPercent;
        private transient CuboidStatCalculator cuboidStatCalculator;
        private transient FactDistinctColumnsReducerMapping reducerMapping;
        private List<TblColRef> allCols;
        private int[] columnIndex;
        private transient FactDistinctColumnsMapper.DictColDeduper dictColDeduper;
        private transient ByteBuffer tmpbuf;
        private LongAccumulator bytesWritten;
        private KeyValueBuilder keyValueBuilder;
        private volatile transient boolean initialized = false;
        private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();

        public FlatOutputFucntion(String str, String str2, String str3, SerializableConfiguration serializableConfiguration, int i, LongAccumulator longAccumulator) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
            this.samplingPercent = i;
            this.bytesWritten = longAccumulator;
        }

        private void init() {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    CubeDesc descriptor = cube.getDescriptor();
                    CubeSegment segmentById = cube.getSegmentById(this.segmentId);
                    CubeJoinedFlatTableEnrich cubeJoinedFlatTableEnrich = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(segmentById), descriptor);
                    this.keyValueBuilder = new KeyValueBuilder(cubeJoinedFlatTableEnrich);
                    this.reducerMapping = new FactDistinctColumnsReducerMapping(cube);
                    this.tmpbuf = ByteBuffer.allocate(4096);
                    int[] rowKeyColumnIndexes = cubeJoinedFlatTableEnrich.getRowKeyColumnIndexes();
                    Long[] cuboidIds = getCuboidIds(segmentById);
                    this.cuboidStatCalculator = new CuboidStatCalculator(rowKeyColumnIndexes, cuboidIds, CuboidUtil.getCuboidBitSet(cuboidIds, rowKeyColumnIndexes.length), isUsePutRowKeyToHllNewAlgorithm(descriptor), getInitCuboidsHLL(cuboidIds.length, descriptor.getConfig().getCubeStatsHLLPrecision()));
                    this.allCols = this.reducerMapping.getAllDimDictCols();
                    initDictColDeduper(descriptor);
                    initColumnIndex(cubeJoinedFlatTableEnrich);
                    this.initialized = true;
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public Iterator<Tuple2<SelfDefineSortableKey, Text>> call(Iterator<String[]> it) throws Exception {
            if (!this.initialized) {
                synchronized (SparkFactDistinct.class) {
                    if (!this.initialized) {
                        init();
                    }
                }
            }
            ArrayList newArrayList = Lists.newArrayList();
            int i = 0;
            while (it.hasNext()) {
                String[] next = it.next();
                this.bytesWritten.add(countSizeInBytes(next));
                for (int i2 = 0; i2 < this.allCols.size(); i2++) {
                    String str = next[this.columnIndex[i2]];
                    if (str != null && !this.keyValueBuilder.isNull(str)) {
                        DataType type = this.allCols.get(i2).getType();
                        if (!this.dictColDeduper.isDictCol(i2)) {
                            DimensionRangeInfo dimensionRangeInfo = this.dimensionRangeInfoMap.get(Integer.valueOf(i2));
                            if (dimensionRangeInfo == null) {
                                this.dimensionRangeInfoMap.put(Integer.valueOf(i2), new DimensionRangeInfo(str, str));
                            } else {
                                dimensionRangeInfo.setMax(type.getOrder().max(dimensionRangeInfo.getMax(), str));
                                dimensionRangeInfo.setMin(type.getOrder().min(dimensionRangeInfo.getMin(), str));
                            }
                        } else if (this.dictColDeduper.add(i2, str)) {
                            addFieldValue(type, Integer.valueOf(i2), str, newArrayList);
                        }
                    }
                }
                if (i % 100 < this.samplingPercent) {
                    this.cuboidStatCalculator.putRow(next);
                }
                if (i % 100 == 0) {
                    this.dictColDeduper.resetIfShortOfMem();
                }
                i++;
            }
            ByteBuffer allocate = ByteBuffer.allocate(1048576);
            Long[] cuboidIds = this.cuboidStatCalculator.getCuboidIds();
            HLLCounter[] hLLCounters = this.cuboidStatCalculator.getHLLCounters();
            for (int i3 = 0; i3 < cuboidIds.length; i3++) {
                HLLCounter hLLCounter = hLLCounters[i3];
                this.tmpbuf.clear();
                this.tmpbuf.put((byte) -1);
                this.tmpbuf.putLong(cuboidIds[i3].longValue());
                Text text = new Text();
                Text text2 = new Text();
                SelfDefineSortableKey selfDefineSortableKey = new SelfDefineSortableKey();
                text.set(this.tmpbuf.array(), 0, this.tmpbuf.position());
                allocate.clear();
                hLLCounter.writeRegisters(allocate);
                text2.set(allocate.array(), 0, allocate.position());
                selfDefineSortableKey.init(text, (byte) 0);
                newArrayList.add(new Tuple2<>(selfDefineSortableKey, text2));
            }
            for (Map.Entry<Integer, DimensionRangeInfo> entry : this.dimensionRangeInfoMap.entrySet()) {
                int intValue = entry.getKey().intValue();
                DimensionRangeInfo value = entry.getValue();
                DataType type2 = this.allCols.get(intValue).getType();
                addFieldValue(type2, Integer.valueOf(intValue), value.getMin(), newArrayList);
                addFieldValue(type2, Integer.valueOf(intValue), value.getMax(), newArrayList);
            }
            return newArrayList.iterator();
        }

        private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
            boolean z;
            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
                z = false;
                SparkFactDistinct.logger.info("Found KylinVersion: {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
            } else {
                z = true;
                SparkFactDistinct.logger.info("Found KylinVersion: {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion());
            }
            return z;
        }

        private Long[] getCuboidIds(CubeSegment cubeSegment) {
            HashSet newHashSet = Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
                newHashSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
            }
            return (Long[]) newHashSet.toArray(new Long[newHashSet.size()]);
        }

        private HLLCounter[] getInitCuboidsHLL(int i, int i2) {
            HLLCounter[] hLLCounterArr = new HLLCounter[i];
            for (int i3 = 0; i3 < i; i3++) {
                hLLCounterArr[i3] = new HLLCounter(i2, RegisterType.DENSE);
            }
            return hLLCounterArr;
        }

        private void initDictColDeduper(CubeDesc cubeDesc) {
            this.dictColDeduper = new FactDistinctColumnsMapper.DictColDeduper();
            Set<TblColRef> allColumnsNeedDictionaryBuilt = cubeDesc.getAllColumnsNeedDictionaryBuilt();
            for (int i = 0; i < this.allCols.size(); i++) {
                if (allColumnsNeedDictionaryBuilt.contains(this.allCols.get(i))) {
                    this.dictColDeduper.setIsDictCol(i);
                }
            }
        }

        private void initColumnIndex(CubeJoinedFlatTableEnrich cubeJoinedFlatTableEnrich) {
            this.columnIndex = new int[this.allCols.size()];
            for (int i = 0; i < this.allCols.size(); i++) {
                this.columnIndex[i] = cubeJoinedFlatTableEnrich.getColumnIndex(this.allCols.get(i));
            }
        }

        private void addFieldValue(DataType dataType, Integer num, String str, List<Tuple2<SelfDefineSortableKey, Text>> list) {
            int reducerIdForCol = this.reducerMapping.getReducerIdForCol(num.intValue(), str);
            this.tmpbuf.clear();
            byte[] bytes = Bytes.toBytes(str);
            int length = bytes.length + 1;
            if (length >= this.tmpbuf.capacity()) {
                this.tmpbuf = ByteBuffer.allocate(countNewSize(this.tmpbuf.capacity(), length));
            }
            this.tmpbuf.put(Bytes.toBytes(reducerIdForCol)[3]);
            this.tmpbuf.put(bytes);
            Text text = new Text();
            SelfDefineSortableKey selfDefineSortableKey = new SelfDefineSortableKey();
            text.set(this.tmpbuf.array(), 0, this.tmpbuf.position());
            selfDefineSortableKey.init(text, dataType);
            list.add(new Tuple2<>(selfDefineSortableKey, new Text()));
            if (list.size() < 10) {
                SparkFactDistinct.logger.info("Sample output: {} '{}' => reducer {}", this.allCols.get(num.intValue()), str, Integer.valueOf(reducerIdForCol));
            }
        }

        private int countNewSize(int i, int i2) {
            int i3 = i;
            while (true) {
                int i4 = i3 * 2;
                if (i4 >= i2) {
                    return i4;
                }
                i3 = i4;
            }
        }

        private int countSizeInBytes(String[] strArr) {
            int i = 0;
            int length = strArr.length;
            for (int i2 = 0; i2 < length; i2++) {
                String str = strArr[i2];
                i = i + (str == null ? 1 : StringUtil.utf8Length(str)) + 1;
            }
            return i;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.6.2.jar:org/apache/kylin/engine/spark/SparkFactDistinct$MultiOutputFunction.class */
    static class MultiOutputFunction implements PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey, Text>>, String, Tuple3<Writable, Writable, String>> {
        private String cubeName;
        private String metaUrl;
        private SerializableConfiguration conf;
        private int samplingPercent;
        private transient FactDistinctColumnsReducerMapping reducerMapping;
        private int taskId;
        private long baseCuboidId;
        private List<Long> baseCuboidRowCountInMappers;
        private Map<Long, HLLCounter> cuboidHLLMap;
        private TblColRef col;
        private boolean buildDictInReducer;
        private transient IDictionaryBuilder builder;
        private KylinConfig cubeConfig;
        private CubeDesc cubeDesc;
        private boolean isDimensionCol;
        private boolean isDictCol;
        private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
        private volatile transient boolean initialized = false;
        private String DICT_FILE_POSTFIX = FactDistinctColumnsReducer.DICT_FILE_POSTFIX;
        private String DIMENSION_COL_INFO_FILE_POSTFIX = FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX;
        private boolean isStatistics = false;
        private int rowCount = 0;
        private long totalRowsBeforeMerge = 0;
        private String maxValue = null;
        private String minValue = null;

        public MultiOutputFunction(String str, String str2, SerializableConfiguration serializableConfiguration, int i) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
            this.samplingPercent = i;
        }

        private void init() throws IOException {
            this.taskId = TaskContext.getPartitionId();
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    this.cubeDesc = cube.getDescriptor();
                    this.cubeConfig = cube.getConfig();
                    this.reducerMapping = new FactDistinctColumnsReducerMapping(cube);
                    this.result = Lists.newArrayList();
                    if (this.reducerMapping.isCuboidRowCounterReducer(this.taskId)) {
                        this.isStatistics = true;
                        this.baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
                        this.baseCuboidRowCountInMappers = Lists.newArrayList();
                        this.cuboidHLLMap = Maps.newHashMap();
                        SparkFactDistinct.logger.info("Partition {} handling stats", Integer.valueOf(this.taskId));
                    } else {
                        this.col = this.reducerMapping.getColForReducer(this.taskId);
                        Preconditions.checkNotNull(this.col);
                        this.isDimensionCol = this.cubeDesc.listDimensionColumnsExcludingDerived(true).contains(this.col) && this.col.getType().needCompare();
                        this.isDictCol = this.cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(this.col);
                        this.buildDictInReducer = loadKylinConfigFromHdfs.isBuildDictInReducerEnabled();
                        if (this.cubeDesc.getDictionaryBuilderClass(this.col) != null) {
                            this.buildDictInReducer = false;
                        }
                        if (this.reducerMapping.getReducerNumForDimCol(this.col) > 1) {
                            this.buildDictInReducer = false;
                        }
                        if (this.buildDictInReducer) {
                            this.builder = DictionaryGenerator.newDictionaryBuilder(this.col.getType());
                            this.builder.init(null, 0, null);
                        }
                        SparkFactDistinct.logger.info("Partition {} handling column {}, buildDictInReducer={}", Integer.valueOf(this.taskId), this.col, Boolean.valueOf(this.buildDictInReducer));
                    }
                    this.initialized = true;
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        private void logAFewRows(String str) {
            if (this.rowCount < 10) {
                SparkFactDistinct.logger.info("Received value: {}", str);
            }
        }

        public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(Iterator<Tuple2<SelfDefineSortableKey, Text>> it) throws Exception {
            if (!this.initialized) {
                synchronized (SparkFactDistinct.class) {
                    if (!this.initialized) {
                        init();
                    }
                }
            }
            if (this.isStatistics) {
                calculateStatistics(it);
                ArrayList newArrayList = Lists.newArrayList();
                newArrayList.addAll(this.cuboidHLLMap.keySet());
                Collections.sort(newArrayList);
                logMapperAndCuboidStatistics(newArrayList);
                outputStatistics(newArrayList, this.result);
            } else {
                calculateColData(it);
                if (this.isDimensionCol) {
                    outputDimRangeInfo(this.result);
                }
                if (this.buildDictInReducer) {
                    outputDict(this.col, this.builder.build(), this.result);
                }
            }
            return this.result.iterator();
        }

        private void calculateStatistics(Iterator<Tuple2<SelfDefineSortableKey, Text>> it) throws IOException {
            while (it.hasNext()) {
                HLLCounter hLLCounter = new HLLCounter(this.cubeConfig.getCubeStatsHLLPrecision());
                Tuple2<SelfDefineSortableKey, Text> next = it.next();
                long j = Bytes.toLong(((SelfDefineSortableKey) next._1).getText().getBytes(), 1);
                hLLCounter.readRegisters(ByteBuffer.wrap(((Text) next._2).getBytes(), 0, ((Text) next._2).getLength()));
                this.totalRowsBeforeMerge += hLLCounter.getCountEstimate();
                if (j == this.baseCuboidId) {
                    this.baseCuboidRowCountInMappers.add(Long.valueOf(hLLCounter.getCountEstimate()));
                }
                if (this.cuboidHLLMap.get(Long.valueOf(j)) != null) {
                    this.cuboidHLLMap.get(Long.valueOf(j)).merge(hLLCounter);
                } else {
                    this.cuboidHLLMap.put(Long.valueOf(j), hLLCounter);
                }
            }
        }

        private void calculateColData(Iterator<Tuple2<SelfDefineSortableKey, Text>> it) {
            while (it.hasNext()) {
                Tuple2<SelfDefineSortableKey, Text> next = it.next();
                String bytes = Bytes.toString(((SelfDefineSortableKey) next._1).getText().getBytes(), 1, ((SelfDefineSortableKey) next._1).getText().getLength() - 1);
                logAFewRows(bytes);
                if (this.isDimensionCol) {
                    if (this.minValue == null || this.col.getType().compare(this.minValue, bytes) > 0) {
                        this.minValue = bytes;
                    }
                    if (this.maxValue == null || this.col.getType().compare(this.maxValue, bytes) < 0) {
                        this.maxValue = bytes;
                    }
                }
                if (this.isDictCol) {
                    if (this.buildDictInReducer) {
                        this.builder.addValue(bytes);
                    } else {
                        this.result.add(new Tuple2<>("column", new Tuple3(NullWritable.get(), new Text(bytes.getBytes(StandardCharsets.UTF_8)), this.col.getIdentity() + "/")));
                    }
                }
                this.rowCount++;
            }
        }

        private void logMapperAndCuboidStatistics(List<Long> list) {
            SparkFactDistinct.logger.info("Cuboid number for task: {}\t{}", Integer.valueOf(this.taskId), Integer.valueOf(list.size()));
            SparkFactDistinct.logger.info("Samping percentage: \t{}", Integer.valueOf(this.samplingPercent));
            SparkFactDistinct.logger.info("The following statistics are collected based on sampling data. ");
            SparkFactDistinct.logger.info("Number of Mappers: {}", Integer.valueOf(this.baseCuboidRowCountInMappers.size()));
            for (int i = 0; i < this.baseCuboidRowCountInMappers.size(); i++) {
                if (this.baseCuboidRowCountInMappers.get(i).longValue() > 0) {
                    SparkFactDistinct.logger.info("Base Cuboid in Mapper {} row count: \t {}", Integer.valueOf(i), this.baseCuboidRowCountInMappers.get(i));
                }
            }
            long j = 0;
            Iterator<Long> it = list.iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                j += this.cuboidHLLMap.get(Long.valueOf(longValue)).getCountEstimate();
                SparkFactDistinct.logger.info("Cuboid {} row count is: \t {}", Long.valueOf(longValue), Long.valueOf(this.cuboidHLLMap.get(Long.valueOf(longValue)).getCountEstimate()));
            }
            SparkFactDistinct.logger.info("Sum of row counts (before merge) is: \t {}", Long.valueOf(this.totalRowsBeforeMerge));
            SparkFactDistinct.logger.info("After merge, the row count: \t {}", Long.valueOf(j));
        }

        private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable, Writable, String>>> list) {
            if (this.col == null || this.minValue == null) {
                return;
            }
            String str = this.col.getIdentity() + "/" + this.col.getName() + this.DIMENSION_COL_INFO_FILE_POSTFIX;
            list.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_PARTITION, new Tuple3(NullWritable.get(), new Text(this.minValue.getBytes(StandardCharsets.UTF_8)), str)));
            list.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_PARTITION, new Tuple3(NullWritable.get(), new Text(this.maxValue.getBytes(StandardCharsets.UTF_8)), str)));
            SparkFactDistinct.logger.info("write dimension range info for col : {}  minValue:{} maxValue:{}", this.col.getName(), this.minValue, this.maxValue);
        }

        private void outputDict(TblColRef tblColRef, Dictionary<String> dictionary, List<Tuple2<String, Tuple3<Writable, Writable, String>>> list) throws IOException {
            String str = tblColRef.getIdentity() + "/" + tblColRef.getName() + this.DICT_FILE_POSTFIX;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                Throwable th2 = null;
                try {
                    try {
                        dataOutputStream.writeUTF(dictionary.getClass().getName());
                        dictionary.write(dataOutputStream);
                        list.add(new Tuple2<>("dict", new Tuple3(NullWritable.get(), new ArrayPrimitiveWritable(byteArrayOutputStream.toByteArray()), str)));
                        if (dataOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    dataOutputStream.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                dataOutputStream.close();
                            }
                        }
                        if (byteArrayOutputStream != null) {
                            if (0 == 0) {
                                byteArrayOutputStream.close();
                                return;
                            }
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (dataOutputStream != null) {
                        if (th2 != null) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (byteArrayOutputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                throw th8;
            }
        }

        private void outputStatistics(List<Long> list, List<Tuple2<String, Tuple3<Writable, Writable, String>>> list2) throws IOException {
            long j = 0;
            Iterator<HLLCounter> it = this.cuboidHLLMap.values().iterator();
            while (it.hasNext()) {
                j += it.next().getCountEstimate();
            }
            list2.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_STATISTICS, new Tuple3(new LongWritable(-1L), new BytesWritable(Bytes.toBytes(j == 0 ? DMinMax.MIN_CHAR : this.totalRowsBeforeMerge / j)), "statistics/statistics")));
            list2.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_STATISTICS, new Tuple3(new LongWritable(-2L), new BytesWritable(Bytes.toBytes(this.baseCuboidRowCountInMappers.size())), "statistics/statistics")));
            list2.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_STATISTICS, new Tuple3(new LongWritable(0L), new BytesWritable(Bytes.toBytes(this.samplingPercent)), "statistics/statistics")));
            ByteBuffer allocate = ByteBuffer.allocate(1048576);
            Iterator<Long> it2 = list.iterator();
            while (it2.hasNext()) {
                long longValue = it2.next().longValue();
                allocate.clear();
                this.cuboidHLLMap.get(Long.valueOf(longValue)).writeRegisters(allocate);
                allocate.flip();
                byte[] bArr = new byte[allocate.limit()];
                System.arraycopy(allocate.array(), 0, bArr, 0, allocate.limit());
                list2.add(new Tuple2<>(BatchConstants.CFG_OUTPUT_STATISTICS, new Tuple3(new LongWritable(longValue), new BytesWritable(bArr, bArr.length), "statistics/statistics")));
            }
        }
    }

    public SparkFactDistinct() {
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_INPUT_TABLE);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_STATS_SAMPLING_PERCENT);
        this.options.addOption(OPTION_COUNTER_PATH);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue6 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        String optionValue7 = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
        int parseInt = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
        Class[] clsArr = {Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};
        SparkConf appName = new SparkConf().setAppName("Fact distinct columns for:" + optionValue + " segment " + optionValue3);
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
        appName.set("spark.kryo.registrationRequired", "true").registerKryoClasses(clsArr);
        KylinSparkJobListener kylinSparkJobListener = new KylinSparkJobListener();
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        Throwable th = null;
        try {
            try {
                javaSparkContext.sc().addSparkListener(kylinSparkJobListener);
                HadoopUtil.deletePath(javaSparkContext.hadoopConfiguration(), new Path(optionValue6));
                SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
                KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue2);
                CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue);
                Job job = Job.getInstance(serializableConfiguration.get());
                FactDistinctColumnsReducerMapping factDistinctColumnsReducerMapping = new FactDistinctColumnsReducerMapping(cube);
                logger.info("RDD Output path: {}", optionValue6);
                logger.info("getTotalReducerNum: {}", Integer.valueOf(factDistinctColumnsReducerMapping.getTotalReducerNum()));
                logger.info("getCuboidRowCounterReducerNum: {}", Integer.valueOf(factDistinctColumnsReducerMapping.getCuboidRowCounterReducerNum()));
                logger.info("counter path {}", optionValue7);
                boolean equalsIgnoreCase = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(loadKylinConfigFromHdfs.getFlatTableStorageFormat());
                LongAccumulator longAccumulator = javaSparkContext.sc().longAccumulator();
                JavaRDD<String[]> hiveRecordInputRDD = SparkUtil.hiveRecordInputRDD(equalsIgnoreCase, javaSparkContext, optionValue5, optionValue4);
                JavaPairRDD mapPartitionsToPair = hiveRecordInputRDD.mapPartitionsToPair(new FlatOutputFucntion(optionValue, optionValue3, optionValue2, serializableConfiguration, parseInt, longAccumulator)).repartitionAndSortWithinPartitions(new FactDistinctPartitioner(optionValue, optionValue2, serializableConfiguration, factDistinctColumnsReducerMapping.getTotalReducerNum())).mapPartitionsToPair(new MultiOutputFunction(optionValue, optionValue2, serializableConfiguration, parseInt));
                MultipleOutputs.addNamedOutput(job, "column", SequenceFileOutputFormat.class, NullWritable.class, Text.class);
                MultipleOutputs.addNamedOutput(job, "dict", SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
                MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
                MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
                FileOutputFormat.setOutputPath(job, new Path(optionValue6));
                FileOutputFormat.setCompressOutput(job, false);
                LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
                MultipleOutputsRDD.rddToMultipleOutputsRDD(mapPartitionsToPair).saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
                long count = hiveRecordInputRDD.count();
                logger.info("Map input records={}", Long.valueOf(count));
                logger.info("HDFS Read: {} HDFS Write", longAccumulator.value());
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(count));
                newHashMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(longAccumulator.value()));
                HadoopUtil.writeToSequenceFile(javaSparkContext.hadoopConfiguration(), optionValue7, newHashMap);
                HadoopUtil.deleteHDFSMeta(optionValue2);
                if (javaSparkContext != null) {
                    if (0 == 0) {
                        javaSparkContext.close();
                        return;
                    }
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (javaSparkContext != null) {
                if (th != null) {
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    javaSparkContext.close();
                }
            }
            throw th4;
        }
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Statistics sampling percent");
        OPTION_STATS_SAMPLING_PERCENT = OptionBuilder.create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
        OptionBuilder.withArgName("hiveTable");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table");
        OPTION_INPUT_TABLE = OptionBuilder.create("hiveTable");
        OptionBuilder.withArgName(BatchConstants.ARG_INPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table PATH");
        OPTION_INPUT_PATH = OptionBuilder.create(BatchConstants.ARG_INPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("counter output path");
        OPTION_COUNTER_PATH = OptionBuilder.create(BatchConstants.ARG_COUNTER_OUPUT);
    }
}
