package org.apache.kylin.engine.spark;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.5.0.jar:org/apache/kylin/engine/spark/SparkMergingDictionary.class */
public class SparkMergingDictionary extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) SparkMergingDictionary.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_MERGE_SEGMENT_IDS;
    public static final Option OPTION_OUTPUT_PATH_DICT;
    public static final Option OPTION_OUTPUT_PATH_STAT;
    private Options options = new Options();

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-spark-2.5.0.jar:org/apache/kylin/engine/spark/SparkMergingDictionary$MergeDictAndStatsFunction.class */
    public static class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
        private volatile transient boolean initialized = false;
        private String cubeName;
        private String metaUrl;
        private String segmentId;
        private String[] segmentIds;
        private String statOutputPath;
        private TblColRef[] tblColRefs;
        private SerializableConfiguration conf;
        private DictionaryManager dictMgr;
        private KylinConfig kylinConfig;
        private List<CubeSegment> mergingSegments;

        public MergeDictAndStatsFunction(String str, String str2, String str3, String[] strArr, String str4, TblColRef[] tblColRefArr, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.segmentId = str3;
            this.segmentIds = strArr;
            this.statOutputPath = str4;
            this.tblColRefs = tblColRefArr;
            this.conf = serializableConfiguration;
        }

        private void init() {
            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(this.kylinConfig);
            Throwable th = null;
            try {
                CubeInstance cube = CubeManager.getInstance(this.kylinConfig).getCube(this.cubeName);
                this.dictMgr = DictionaryManager.getInstance(this.kylinConfig);
                this.mergingSegments = getMergingSegments(cube, this.segmentIds);
                if (andUnsetThreadLocalConfig != null) {
                    if (0 == 0) {
                        andUnsetThreadLocalConfig.close();
                        return;
                    }
                    try {
                        andUnsetThreadLocalConfig.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (andUnsetThreadLocalConfig != null) {
                    if (0 != 0) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th3;
            }
        }

        /* JADX WARN: Finally extract failed */
        public Tuple2<Text, Text> call(Integer num) throws Exception {
            DictionaryInfo dictionaryInfo;
            if (!this.initialized) {
                synchronized (SparkMergingDictionary.class) {
                    if (!this.initialized) {
                        init();
                        this.initialized = true;
                    }
                }
            }
            if (num.intValue() < this.tblColRefs.length) {
                TblColRef tblColRef = this.tblColRefs[num.intValue()];
                ArrayList newArrayList = Lists.newArrayList();
                KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(this.kylinConfig);
                Throwable th = null;
                try {
                    try {
                        for (CubeSegment cubeSegment : this.mergingSegments) {
                            if (cubeSegment.getDictResPath(tblColRef) != null && (dictionaryInfo = this.dictMgr.getDictionaryInfo(cubeSegment.getDictResPath(tblColRef))) != null && !newArrayList.contains(dictionaryInfo)) {
                                newArrayList.add(dictionaryInfo);
                            }
                        }
                        DictionaryInfo mergeDictionary = this.dictMgr.mergeDictionary(newArrayList);
                        Tuple2<Text, Text> tuple2 = new Tuple2<>(new Text(tblColRef.getTableAlias() + ":" + tblColRef.getName()), new Text(mergeDictionary == null ? "" : mergeDictionary.getResourcePath()));
                        if (andUnsetThreadLocalConfig != null) {
                            if (0 != 0) {
                                try {
                                    andUnsetThreadLocalConfig.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                andUnsetThreadLocalConfig.close();
                            }
                        }
                        return tuple2;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (andUnsetThreadLocalConfig != null) {
                        if (th != null) {
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            andUnsetThreadLocalConfig.close();
                        }
                    }
                    throw th3;
                }
            }
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig2 = KylinConfig.setAndUnsetThreadLocalConfig(this.kylinConfig);
            Throwable th5 = null;
            try {
                CubeSegment segmentById = CubeManager.getInstance(this.kylinConfig).getCube(this.cubeName).getSegmentById(this.segmentId);
                ResourceStore store = ResourceStore.getStore(this.kylinConfig);
                HashMap newHashMap = Maps.newHashMap();
                Configuration configuration = null;
                int i = 0;
                Iterator<CubeSegment> it = this.mergingSegments.iterator();
                while (it.hasNext()) {
                    InputStream inputStream = store.getResource(it.next().getStatisticsResourcePath()).inputStream;
                    FileOutputStream fileOutputStream = null;
                    try {
                        File createTempFile = File.createTempFile(this.segmentId, ".seq");
                        fileOutputStream = new FileOutputStream(createTempFile);
                        IOUtils.copy(inputStream, fileOutputStream);
                        org.apache.hadoop.io.IOUtils.closeStream(inputStream);
                        org.apache.hadoop.io.IOUtils.closeStream(fileOutputStream);
                        FileSystem fileSystem = HadoopUtil.getFileSystem("file:///" + createTempFile.getAbsolutePath());
                        SequenceFile.Reader reader = null;
                        try {
                            configuration = HadoopUtil.getCurrentConfiguration();
                            reader = new SequenceFile.Reader(fileSystem, new Path(createTempFile.getAbsolutePath()), configuration);
                            LongWritable longWritable = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), configuration);
                            BytesWritable bytesWritable = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), configuration);
                            while (reader.next(longWritable, bytesWritable)) {
                                if (longWritable.get() == 0) {
                                    i += Bytes.toInt(bytesWritable.getBytes());
                                } else if (longWritable.get() > 0) {
                                    HLLCounter hLLCounter = new HLLCounter(this.kylinConfig.getCubeStatsHLLPrecision());
                                    hLLCounter.readRegisters(new ByteArray(bytesWritable.getBytes()).asBuffer());
                                    if (newHashMap.get(Long.valueOf(longWritable.get())) != null) {
                                        ((HLLCounter) newHashMap.get(Long.valueOf(longWritable.get()))).merge(hLLCounter);
                                    } else {
                                        newHashMap.put(Long.valueOf(longWritable.get()), hLLCounter);
                                    }
                                }
                            }
                            org.apache.hadoop.io.IOUtils.closeStream(reader);
                        } catch (Throwable th6) {
                            org.apache.hadoop.io.IOUtils.closeStream(reader);
                            throw th6;
                        }
                    } catch (Throwable th7) {
                        org.apache.hadoop.io.IOUtils.closeStream(inputStream);
                        org.apache.hadoop.io.IOUtils.closeStream(fileOutputStream);
                        throw th7;
                    }
                }
                CubeStatsWriter.writeCuboidStatistics(configuration, new Path(this.statOutputPath), newHashMap, i / this.mergingSegments.size());
                Path path = new Path(this.statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
                FSDataInputStream open = HadoopUtil.getFileSystem(path, configuration).open(path);
                try {
                    store.putResource(segmentById.getStatisticsResourcePath(), (InputStream) open, System.currentTimeMillis());
                    org.apache.hadoop.io.IOUtils.closeStream(open);
                    Tuple2<Text, Text> tuple22 = new Tuple2<>(new Text(""), new Text(""));
                    if (andUnsetThreadLocalConfig2 != null) {
                        if (0 != 0) {
                            try {
                                andUnsetThreadLocalConfig2.close();
                            } catch (Throwable th8) {
                                th5.addSuppressed(th8);
                            }
                        } else {
                            andUnsetThreadLocalConfig2.close();
                        }
                    }
                    return tuple22;
                } catch (Throwable th9) {
                    org.apache.hadoop.io.IOUtils.closeStream(open);
                    throw th9;
                }
            } catch (Throwable th10) {
                if (andUnsetThreadLocalConfig2 != null) {
                    if (0 != 0) {
                        try {
                            andUnsetThreadLocalConfig2.close();
                        } catch (Throwable th11) {
                            th5.addSuppressed(th11);
                        }
                    } else {
                        andUnsetThreadLocalConfig2.close();
                    }
                }
                throw th10;
            }
        }

        private List<CubeSegment> getMergingSegments(CubeInstance cubeInstance, String[] strArr) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(strArr.length);
            for (String str : strArr) {
                newArrayListWithCapacity.add(cubeInstance.getSegmentById(str));
            }
            return newArrayListWithCapacity;
        }
    }

    public SparkMergingDictionary() {
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_MERGE_SEGMENT_IDS);
        this.options.addOption(OPTION_OUTPUT_PATH_DICT);
        this.options.addOption(OPTION_OUTPUT_PATH_STAT);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
        String optionValue6 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
        Class[] clsArr = {Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("scala.collection.mutable.WrappedArray$ofRef")};
        SparkConf appName = new SparkConf().setAppName("Merge dictionary for cube:" + optionValue + ", segment " + optionValue2);
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
        appName.set("spark.kryo.registrationRequired", "true").registerKryoClasses(clsArr);
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        javaSparkContext.sc().addSparkListener(new KylinSparkJobListener());
        HadoopUtil.deletePath(javaSparkContext.hadoopConfiguration(), new Path(optionValue5));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue3);
        CubeDesc cubeDesc = CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue).getDescName());
        logger.info("Dictionary output path: {}", optionValue5);
        logger.info("Statistics output path: {}", optionValue6);
        TblColRef[] tblColRefArr = (TblColRef[]) cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
        int length = tblColRefArr.length;
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(length);
        for (int i = 0; i <= length; i++) {
            newArrayListWithCapacity.add(Integer.valueOf(i));
        }
        javaSparkContext.parallelize(newArrayListWithCapacity, length + 1).mapToPair(new MergeDictAndStatsFunction(optionValue, optionValue3, optionValue2, optionValue4.split(","), optionValue6, tblColRefArr, serializableConfiguration)).coalesce(1, false).saveAsNewAPIHadoopFile(optionValue5, Text.class, Text.class, SequenceFileOutputFormat.class);
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create(CubingExecutableUtil.SEGMENT_ID);
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName("segmentIds");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Merging Cube Segment Ids");
        OPTION_MERGE_SEGMENT_IDS = OptionBuilder.create("segmentIds");
        OptionBuilder.withArgName("dictOutputPath");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("merged dictionary resource path");
        OPTION_OUTPUT_PATH_DICT = OptionBuilder.create("dictOutputPath");
        OptionBuilder.withArgName("statOutputPath");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("merged statistics resource path");
        OPTION_OUTPUT_PATH_STAT = OptionBuilder.create("statOutputPath");
    }
}
