package org.apache.kylin.engine.flink;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsBase;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/flink/FlinkFactDistinctColumns.class */
public class FlinkFactDistinctColumns extends AbstractApplication {
    protected static final Logger logger = LoggerFactory.getLogger(FlinkFactDistinctColumns.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_INPUT_TABLE;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_COUNTER_PATH;
    public static final Option OPTION_STATS_SAMPLING_PERCENT;
    public static final Option OPTION_ENABLE_OBJECT_REUSE;
    private Options options = new Options();

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkFactDistinctColumns$FactDistinctColumnPartitioner.class */
    static class FactDistinctColumnPartitioner implements Partitioner<SelfDefineSortableKey> {
        private volatile transient boolean initialized = false;
        private String cubeName;
        private String metaUrl;
        private SerializableConfiguration conf;
        private transient FactDistinctColumnsReducerMapping reducerMapping;

        public FactDistinctColumnPartitioner(String str, String str2, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
        }

        private void init() {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    this.reducerMapping = new FactDistinctColumnsReducerMapping(CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName));
                    this.initialized = true;
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public int partition(SelfDefineSortableKey selfDefineSortableKey, int i) {
            if (!this.initialized) {
                synchronized (FlinkFactDistinctColumns.class) {
                    if (!this.initialized) {
                        init();
                    }
                }
            }
            Text text = selfDefineSortableKey.getText();
            if (text.getBytes()[0] != -1) {
                return BytesUtil.readUnsigned(text.getBytes(), 0, 1);
            }
            return this.reducerMapping.getReducerIdForCuboidRowCount(Long.valueOf(Bytes.toLong(text.getBytes(), 1, 8)).longValue());
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkFactDistinctColumns$FlatOutputMapPartitionFunction.class */
    static class FlatOutputMapPartitionFunction extends RichMapPartitionFunction<String[], Tuple2<SelfDefineSortableKey, Text>> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;
        private int samplingPercentage;
        private String bytesWrittenName;
        private String recordCounterName;
        private LongCounter bytesWrittenCounter = new LongCounter();
        private LongCounter recordCounter = new LongCounter();
        private FactDistinctColumnsBase base;

        public FlatOutputMapPartitionFunction(SerializableConfiguration serializableConfiguration, String str, String str2, String str3, int i, String str4, String str5) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
            this.samplingPercentage = i;
            this.bytesWrittenName = str4;
            this.recordCounterName = str5;
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(this.bytesWrittenName, this.bytesWrittenCounter);
            getRuntimeContext().addAccumulator(this.recordCounterName, this.recordCounter);
            this.base = new FactDistinctColumnsBase(this.cubeName, this.segmentId, this.metaUrl, this.conf, this.samplingPercentage);
            this.base.setupMap();
        }

        public void mapPartition(Iterable<String[]> iterable, final Collector<Tuple2<SelfDefineSortableKey, Text>> collector) throws Exception {
            FactDistinctColumnsBase.Visitor<SelfDefineSortableKey, Text> visitor = new FactDistinctColumnsBase.Visitor<SelfDefineSortableKey, Text>() { // from class: org.apache.kylin.engine.flink.FlinkFactDistinctColumns.FlatOutputMapPartitionFunction.1
                public void collect(String str, SelfDefineSortableKey selfDefineSortableKey, Text text, String str2) {
                    collector.collect(new Tuple2(selfDefineSortableKey, text));
                }
            };
            for (String[] strArr : iterable) {
                this.bytesWrittenCounter.add(this.base.countSizeInBytes(strArr));
                this.recordCounter.add(1L);
                this.base.map(strArr, visitor);
            }
            this.base.postMap(visitor);
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/flink/FlinkFactDistinctColumns$MultiOutputMapPartitionFunction.class */
    static class MultiOutputMapPartitionFunction extends RichMapPartitionFunction<Tuple2<SelfDefineSortableKey, Text>, Tuple2<String, Tuple3<Writable, Writable, String>>> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;
        private int samplingPercentage;
        private FactDistinctColumnsBase base;

        public MultiOutputMapPartitionFunction(SerializableConfiguration serializableConfiguration, String str, String str2, String str3, int i) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
            this.samplingPercentage = i;
        }

        public void open(Configuration configuration) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            this.base = new FactDistinctColumnsBase(this.cubeName, this.segmentId, this.metaUrl, this.conf, this.samplingPercentage);
            this.base.setupReduce(indexOfThisSubtask);
        }

        public void mapPartition(Iterable<Tuple2<SelfDefineSortableKey, Text>> iterable, final Collector<Tuple2<String, Tuple3<Writable, Writable, String>>> collector) throws Exception {
            FactDistinctColumnsBase.Visitor<Writable, Writable> visitor = new FactDistinctColumnsBase.Visitor<Writable, Writable>() { // from class: org.apache.kylin.engine.flink.FlinkFactDistinctColumns.MultiOutputMapPartitionFunction.1
                public void collect(String str, Writable writable, Writable writable2, String str2) {
                    collector.collect(new Tuple2(str, new Tuple3(writable, writable2, str2)));
                }
            };
            for (Tuple2<SelfDefineSortableKey, Text> tuple2 : iterable) {
                this.base.reduce(new Pair(tuple2.f0, tuple2.f1), visitor);
            }
            this.base.postReduce(visitor);
        }
    }

    public FlinkFactDistinctColumns() {
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_INPUT_TABLE);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_STATS_SAMPLING_PERCENT);
        this.options.addOption(OPTION_COUNTER_PATH);
        this.options.addOption(OPTION_ENABLE_OBJECT_REUSE);
    }

    protected Options getOptions() {
        return this.options;
    }

    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue6 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        String optionValue7 = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
        int parseInt = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
        String optionValue8 = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
        Job job = Job.getInstance();
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
        HadoopUtil.deletePath(job.getConfiguration(), new Path(optionValue6));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue2);
        FactDistinctColumnsReducerMapping factDistinctColumnsReducerMapping = new FactDistinctColumnsReducerMapping(CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue));
        int totalReducerNum = factDistinctColumnsReducerMapping.getTotalReducerNum();
        logger.info("getTotalReducerNum: {}", Integer.valueOf(totalReducerNum));
        logger.info("getCuboidRowCounterReducerNum: {}", Integer.valueOf(factDistinctColumnsReducerMapping.getCuboidRowCounterReducerNum()));
        logger.info("counter path {}", optionValue7);
        boolean equalsIgnoreCase = "SEQUENCEFILE".equalsIgnoreCase(loadKylinConfigFromHdfs.getFlatTableStorageFormat());
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        if (!StringUtil.isEmpty(optionValue8) && optionValue8.equalsIgnoreCase("true")) {
            executionEnvironment.getConfig().enableObjectReuse();
        }
        Operator parallelism = FlinkUtil.readHiveRecords(equalsIgnoreCase, executionEnvironment, optionValue5, optionValue4, job).mapPartition(new FlatOutputMapPartitionFunction(serializableConfiguration, optionValue, optionValue3, optionValue2, parseInt, "byte-writer-counter", "record-counter")).partitionCustom(new FactDistinctColumnPartitioner(optionValue, optionValue2, serializableConfiguration), 0).setParallelism(totalReducerNum).mapPartition(new MultiOutputMapPartitionFunction(serializableConfiguration, optionValue, optionValue3, optionValue2, parseInt)).setParallelism(totalReducerNum);
        MultipleOutputs.addNamedOutput(job, "column", SequenceFileOutputFormat.class, NullWritable.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "dict", SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
        MultipleOutputs.addNamedOutput(job, "statistics", SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
        MultipleOutputs.addNamedOutput(job, "partition", TextOutputFormat.class, NullWritable.class, LongWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(optionValue6));
        FileOutputFormat.setCompressOutput(job, false);
        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
        parallelism.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
        Map allAccumulatorResults = executionEnvironment.execute("Fact distinct columns for:" + optionValue + " segment " + optionValue3).getAllAccumulatorResults();
        Long l = (Long) allAccumulatorResults.get("record-counter");
        Long l2 = (Long) allAccumulatorResults.get("byte-writer-counter");
        logger.info("Map input records={}", l);
        logger.info("HDFS Read: {} HDFS Write", l2);
        logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(optionValue6, workingFileSystem));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("source_records_count", String.valueOf(l));
        newHashMap.put("source_records_size", String.valueOf(l2));
        HadoopUtil.writeToSequenceFile(job.getConfiguration(), optionValue7, newHashMap);
    }

    static {
        OptionBuilder.withArgName("cubename");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create("cubename");
        OptionBuilder.withArgName("segmentId");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName("hiveTable");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table");
        OPTION_INPUT_TABLE = OptionBuilder.create("hiveTable");
        OptionBuilder.withArgName("input");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table PATH");
        OPTION_INPUT_PATH = OptionBuilder.create("input");
        OptionBuilder.withArgName("output");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create("output");
        OptionBuilder.withArgName("counterOutput");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Counter output path");
        OPTION_COUNTER_PATH = OptionBuilder.create("counterOutput");
        OptionBuilder.withArgName("statisticssamplingpercent");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Statistics sampling percent");
        OPTION_STATS_SAMPLING_PERCENT = OptionBuilder.create("statisticssamplingpercent");
        OptionBuilder.withArgName("enableObjectReuse");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("Enable object reuse");
        OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.create("enableObjectReuse");
    }
}
