package org.apache.kylin.engine.flink;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.percentile.PercentileCounter;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;

/* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer.class */
public class FlinkCubingByLayer extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) FlinkCubingByLayer.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_TABLE;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_ENABLE_OBJECT_REUSE;
    private Options options = new Options();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$BaseCuboidReduceFunction.class */
    public static class BaseCuboidReduceFunction extends RichReduceFunction<Tuple2<ByteArray, Object[]>> {
        protected String cubeName;
        protected String metaUrl;
        protected CubeDesc cubeDesc;
        protected int measureNum;
        protected MeasureAggregators aggregators;
        protected SerializableConfiguration conf;

        public BaseCuboidReduceFunction(String str, String str2, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    this.cubeDesc = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName).getDescriptor();
                    this.aggregators = new MeasureAggregators(this.cubeDesc.getMeasures());
                    this.measureNum = this.cubeDesc.getMeasures().size();
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        @Override // 
        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> tuple2, Tuple2<ByteArray, Object[]> tuple22) throws Exception {
            Object[] objArr = new Object[this.measureNum];
            this.aggregators.aggregate((Object[]) tuple2.f1, (Object[]) tuple22.f1, objArr);
            return new Tuple2<>(tuple2.f0, objArr);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$BaseCuboidReduceGroupFunction.class */
    private static class BaseCuboidReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
        protected String cubeName;
        protected String metaUrl;
        protected CubeDesc cubeDesc;
        protected int measureNum;
        protected MeasureAggregators aggregators;
        protected SerializableConfiguration conf;

        public BaseCuboidReduceGroupFunction(String str, String str2, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.metaUrl = str2;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    this.cubeDesc = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName).getDescriptor();
                    this.aggregators = new MeasureAggregators(this.cubeDesc.getMeasures());
                    this.measureNum = this.cubeDesc.getMeasures().size();
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            Object[] objArr = null;
            ByteArray byteArray = null;
            for (Tuple2<ByteArray, Object[]> tuple2 : iterable) {
                byteArray = (ByteArray) tuple2.f0;
                if (objArr == null) {
                    objArr = (Object[]) tuple2.f1;
                } else {
                    Object[] objArr2 = new Object[this.measureNum];
                    this.aggregators.aggregate((Object[]) tuple2.f1, objArr, objArr2);
                    objArr = objArr2;
                }
            }
            collector.collect(new Tuple2(byteArray, objArr));
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$CuboidFlatMapFunction.class */
    private static class CuboidFlatMapFunction extends RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private CubeSegment cubeSegment;
        private CubeDesc cubeDesc;
        private NDCuboidBuilder ndCuboidBuilder;
        private RowKeySplitter rowKeySplitter;
        private SerializableConfiguration conf;

        public CuboidFlatMapFunction(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    this.cubeSegment = cube.getSegmentById(this.segmentId);
                    this.cubeDesc = cube.getDescriptor();
                    this.ndCuboidBuilder = new NDCuboidBuilder(this.cubeSegment, new RowKeyEncoderProvider(this.cubeSegment));
                    this.rowKeySplitter = new RowKeySplitter(this.cubeSegment);
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public void flatMap(Tuple2<ByteArray, Object[]> tuple2, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            byte[] array = ((ByteArray) tuple2.f0).array();
            long parseCuboid = this.rowKeySplitter.parseCuboid(array);
            List<Long> spanningCuboid = this.cubeSegment.getCuboidScheduler().getSpanningCuboid(parseCuboid);
            if (spanningCuboid == null || spanningCuboid.size() == 0) {
                return;
            }
            this.rowKeySplitter.split(array);
            Cuboid findForMandatory = Cuboid.findForMandatory(this.cubeDesc, parseCuboid);
            Iterator<Long> it = spanningCuboid.iterator();
            while (it.hasNext()) {
                collector.collect(new Tuple2(this.ndCuboidBuilder.buildKey2(findForMandatory, Cuboid.findForMandatory(this.cubeDesc, it.next().longValue()), this.rowKeySplitter.getSplitBuffers()), tuple2.f1));
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<ByteArray, Object[]>) obj, (Collector<Tuple2<ByteArray, Object[]>>) collector);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$CuboidMapPartitionFunction.class */
    private static class CuboidMapPartitionFunction extends RichMapPartitionFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private CubeSegment cubeSegment;
        private CubeDesc cubeDesc;
        private NDCuboidBuilder ndCuboidBuilder;
        private RowKeySplitter rowKeySplitter;
        private SerializableConfiguration conf;

        public CuboidMapPartitionFunction(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    this.cubeSegment = cube.getSegmentById(this.segmentId);
                    this.cubeDesc = cube.getDescriptor();
                    this.ndCuboidBuilder = new NDCuboidBuilder(this.cubeSegment, new RowKeyEncoderProvider(this.cubeSegment));
                    this.rowKeySplitter = new RowKeySplitter(this.cubeSegment);
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public void mapPartition(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            for (Tuple2<ByteArray, Object[]> tuple2 : iterable) {
                byte[] array = ((ByteArray) tuple2.f0).array();
                long parseCuboid = this.rowKeySplitter.parseCuboid(array);
                List<Long> spanningCuboid = this.cubeSegment.getCuboidScheduler().getSpanningCuboid(parseCuboid);
                if (spanningCuboid != null && spanningCuboid.size() != 0) {
                    this.rowKeySplitter.split(array);
                    Cuboid findForMandatory = Cuboid.findForMandatory(this.cubeDesc, parseCuboid);
                    Iterator<Long> it = spanningCuboid.iterator();
                    while (it.hasNext()) {
                        collector.collect(new Tuple2(this.ndCuboidBuilder.buildKey2(findForMandatory, Cuboid.findForMandatory(this.cubeDesc, it.next().longValue()), this.rowKeySplitter.getSplitBuffers()), tuple2.f1));
                    }
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$CuboidReduceFunction.class */
    private static class CuboidReduceFunction extends BaseCuboidReduceFunction {
        private boolean[] needAgg;

        public CuboidReduceFunction(String str, String str2, SerializableConfiguration serializableConfiguration, boolean[] zArr) {
            super(str, str2, serializableConfiguration);
            this.needAgg = zArr;
        }

        @Override // org.apache.kylin.engine.flink.FlinkCubingByLayer.BaseCuboidReduceFunction
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
        }

        @Override // org.apache.kylin.engine.flink.FlinkCubingByLayer.BaseCuboidReduceFunction
        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> tuple2, Tuple2<ByteArray, Object[]> tuple22) throws Exception {
            Object[] objArr = new Object[this.measureNum];
            this.aggregators.aggregate((Object[]) tuple2.f1, (Object[]) tuple22.f1, objArr, this.needAgg);
            return new Tuple2<>(tuple2.f0, objArr);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$CuboidReduceGroupFunction.class */
    private static class CuboidReduceGroupFunction extends BaseCuboidReduceGroupFunction {
        private boolean[] needAgg;

        public CuboidReduceGroupFunction(String str, String str2, SerializableConfiguration serializableConfiguration, boolean[] zArr) {
            super(str, str2, serializableConfiguration);
            this.needAgg = zArr;
        }

        @Override // org.apache.kylin.engine.flink.FlinkCubingByLayer.BaseCuboidReduceGroupFunction
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
        }

        @Override // org.apache.kylin.engine.flink.FlinkCubingByLayer.BaseCuboidReduceGroupFunction
        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            Object[] objArr = null;
            ByteArray byteArray = null;
            for (Tuple2<ByteArray, Object[]> tuple2 : iterable) {
                byteArray = (ByteArray) tuple2.f0;
                if (objArr == null) {
                    objArr = (Object[]) tuple2.f1;
                } else {
                    Object[] objArr2 = new Object[this.measureNum];
                    this.aggregators.aggregate((Object[]) tuple2.f1, objArr, objArr2, this.needAgg);
                    objArr = objArr2;
                }
            }
            collector.collect(new Tuple2(byteArray, objArr));
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$EncodeBaseCuboidMapFunction.class */
    private static class EncodeBaseCuboidMapFunction extends RichMapFunction<String[], Tuple2<ByteArray, Object[]>> {
        private BaseCuboidBuilder baseCuboidBuilder = null;
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;

        public EncodeBaseCuboidMapFunction(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    CubeDesc descriptor = cube.getDescriptor();
                    CubeSegment segmentById = cube.getSegmentById(this.segmentId);
                    this.baseCuboidBuilder = new BaseCuboidBuilder(loadKylinConfigFromHdfs, descriptor, segmentById, new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(segmentById), descriptor), AbstractRowKeyEncoder.createInstance(segmentById, Cuboid.findForMandatory(descriptor, Cuboid.getBaseCuboidId(descriptor))), MeasureIngester.create(descriptor.getMeasures()), segmentById.buildDictionaryMap());
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public Tuple2<ByteArray, Object[]> map(String[] strArr) throws Exception {
            this.baseCuboidBuilder.resetAggrs();
            byte[] buildKey = this.baseCuboidBuilder.buildKey(strArr);
            return new Tuple2<>(new ByteArray(buildKey), this.baseCuboidBuilder.buildValueObjects(strArr));
        }
    }

    /* loaded from: input_file:WEB-INF/lib/kylin-engine-flink-3.1.3.jar:org/apache/kylin/engine/flink/FlinkCubingByLayer$EncodeBaseCuboidMapPartitionFunction.class */
    private static class EncodeBaseCuboidMapPartitionFunction extends RichMapPartitionFunction<String[], Tuple2<ByteArray, Object[]>> {
        private BaseCuboidBuilder baseCuboidBuilder = null;
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private SerializableConfiguration conf;

        public EncodeBaseCuboidMapPartitionFunction(String str, String str2, String str3, SerializableConfiguration serializableConfiguration) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
        }

        public void open(Configuration configuration) throws Exception {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    CubeDesc descriptor = cube.getDescriptor();
                    CubeSegment segmentById = cube.getSegmentById(this.segmentId);
                    this.baseCuboidBuilder = new BaseCuboidBuilder(loadKylinConfigFromHdfs, descriptor, segmentById, new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(segmentById), descriptor), AbstractRowKeyEncoder.createInstance(segmentById, Cuboid.findForMandatory(descriptor, Cuboid.getBaseCuboidId(descriptor))), MeasureIngester.create(descriptor.getMeasures()), segmentById.buildDictionaryMap());
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        public void mapPartition(Iterable<String[]> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
            for (String[] strArr : iterable) {
                this.baseCuboidBuilder.resetAggrs();
                byte[] buildKey = this.baseCuboidBuilder.buildKey(strArr);
                collector.collect(new Tuple2(new ByteArray(buildKey), this.baseCuboidBuilder.buildValueObjects(strArr)));
            }
        }
    }

    public FlinkCubingByLayer() {
        this.options.addOption(OPTION_INPUT_TABLE);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
        this.options.addOption(OPTION_ENABLE_OBJECT_REUSE);
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue6 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        String optionValue7 = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
        boolean z = false;
        if (optionValue7 != null && !optionValue7.isEmpty()) {
            z = true;
        }
        Job job = Job.getInstance();
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        HadoopUtil.deletePath(job.getConfiguration(), new Path(optionValue6));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue);
        CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue4);
        CubeDesc descriptor = cube.getDescriptor();
        CubeSegment segmentById = cube.getSegmentById(optionValue5);
        logger.info("DataSet input path : {}", optionValue3);
        logger.info("DataSet output path : {}", optionValue6);
        int i = 0;
        Iterator<MeasureDesc> it = descriptor.getMeasures().iterator();
        while (it.hasNext() && !it.next().getFunction().isCount()) {
            i++;
        }
        CubeStatsReader cubeStatsReader = new CubeStatsReader(segmentById, loadKylinConfigFromHdfs);
        boolean[] zArr = new boolean[descriptor.getMeasures().size()];
        boolean z2 = true;
        for (int i2 = 0; i2 < descriptor.getMeasures().size(); i2++) {
            zArr[i2] = !descriptor.getMeasures().get(i2).getFunction().getMeasureType().onlyAggrInBaseCuboid();
            z2 = z2 && zArr[i2];
        }
        logger.info("All measure are normal (agg on all cuboids) ? : " + z2);
        boolean equalsIgnoreCase = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(loadKylinConfigFromHdfs.getFlatTableStorageFormat());
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        if (z) {
            executionEnvironment.getConfig().enableObjectReuse();
        }
        executionEnvironment.getConfig().registerKryoType(PercentileCounter.class);
        executionEnvironment.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
        MapPartitionOperator mapPartition = FlinkUtil.readHiveRecords(equalsIgnoreCase, executionEnvironment, optionValue3, optionValue2, job).mapPartition(new EncodeBaseCuboidMapPartitionFunction(optionValue4, optionValue5, optionValue, serializableConfiguration));
        Long valueOf = loadKylinConfigFromHdfs.isFlinkSanityCheckEnabled() ? Long.valueOf(mapPartition.count()) : 0L;
        GroupReduceFunction baseCuboidReduceGroupFunction = new BaseCuboidReduceGroupFunction(optionValue4, optionValue, serializableConfiguration);
        GroupReduceFunction groupReduceFunction = baseCuboidReduceGroupFunction;
        if (!z2) {
            groupReduceFunction = new CuboidReduceGroupFunction(optionValue4, optionValue, serializableConfiguration, zArr);
        }
        int buildLevel = segmentById.getCuboidScheduler().getBuildLevel();
        DataSet<Tuple2<ByteArray, Object[]>>[] dataSetArr = new DataSet[buildLevel + 1];
        dataSetArr[0] = mapPartition.groupBy(new int[]{0}).reduceGroup(baseCuboidReduceGroupFunction);
        sinkToHDFS(dataSetArr[0], optionValue, optionValue4, segmentById, optionValue6, 0, Job.getInstance(), loadKylinConfigFromHdfs);
        CuboidMapPartitionFunction cuboidMapPartitionFunction = new CuboidMapPartitionFunction(optionValue4, optionValue5, optionValue, serializableConfiguration);
        for (int i3 = 1; i3 <= buildLevel; i3++) {
            dataSetArr[i3] = dataSetArr[i3 - 1].mapPartition(cuboidMapPartitionFunction).groupBy(new int[]{0}).reduceGroup(groupReduceFunction);
            if (loadKylinConfigFromHdfs.isFlinkSanityCheckEnabled()) {
                sanityCheck(dataSetArr[i3], valueOf, i3, cubeStatsReader, i);
            }
            sinkToHDFS(dataSetArr[i3], optionValue, optionValue4, segmentById, optionValue6, i3, Job.getInstance(), loadKylinConfigFromHdfs);
        }
        executionEnvironment.execute("Cubing for : " + optionValue4 + " segment " + optionValue5);
        logger.info("Finished on calculating all level cuboids.");
        logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(optionValue6, workingFileSystem));
    }

    private void sinkToHDFS(DataSet<Tuple2<ByteArray, Object[]>> dataSet, final String str, final String str2, CubeSegment cubeSegment, String str3, int i, Job job, KylinConfig kylinConfig) throws Exception {
        String cuboidOutputPathsByLevel = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(str3, i);
        final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        FlinkUtil.modifyFlinkHadoopConfiguration(job);
        FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, str);
        HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new SequenceFileOutputFormat(), job);
        SequenceFileOutputFormat.setOutputPath(job, new Path(cuboidOutputPathsByLevel));
        dataSet.map(new RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>>() { // from class: org.apache.kylin.engine.flink.FlinkCubingByLayer.1
            BufferedMeasureCodec codec;

            public void open(Configuration configuration) throws Exception {
                KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, str);
                KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
                Throwable th = null;
                try {
                    try {
                        this.codec = new BufferedMeasureCodec(CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(str2).getMeasures());
                        if (andUnsetThreadLocalConfig != null) {
                            if (0 == 0) {
                                andUnsetThreadLocalConfig.close();
                                return;
                            }
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (andUnsetThreadLocalConfig != null) {
                        if (th != null) {
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            andUnsetThreadLocalConfig.close();
                        }
                    }
                    throw th4;
                }
            }

            public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
                ByteBuffer encode = this.codec.encode((Object[]) tuple2.f1);
                Text text = new Text();
                text.set(encode.array(), 0, encode.position());
                return new Tuple2<>(new Text(((ByteArray) tuple2.f0).array()), text);
            }
        }).output(hadoopOutputFormat);
        logger.info("Persisting DataSet for level " + i + " into " + cuboidOutputPathsByLevel);
    }

    private void sanityCheck(DataSet<Tuple2<ByteArray, Object[]>> dataSet, Long l, int i, CubeStatsReader cubeStatsReader, int i2) throws Exception {
        int size = cubeStatsReader.getCuboidsByLayer(i).size();
        Long dataSetCountSum = getDataSetCountSum(dataSet, i2);
        if (dataSetCountSum.longValue() != l.longValue() * size) {
            throw new IllegalStateException(String.format(Locale.ROOT, "Sanity check failed, level %s, total count(*) is %s; cuboid number %s", Integer.valueOf(i), dataSetCountSum, Integer.valueOf(size)));
        }
        logger.info("sanity check success for level " + i + ", count(*) is " + (dataSetCountSum.longValue() / size));
    }

    private Long getDataSetCountSum(DataSet<Tuple2<ByteArray, Object[]>> dataSet, int i) throws Exception {
        return Long.valueOf(dataSet.map(tuple2 -> {
            return new Tuple2(tuple2.f0, (Long) ((Object[]) tuple2.f1)[i]);
        }).sum(1).count());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1624057636:
                if (implMethodName.equals("lambda$getDataSetCountSum$8587e832$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals(BeanDefinitionParserDelegate.MAP_ELEMENT) && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/kylin/engine/flink/FlinkCubingByLayer") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return tuple2 -> {
                        return new Tuple2(tuple2.f0, (Long) ((Object[]) tuple2.f1)[intValue]);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create(BatchConstants.ARG_CUBE_NAME);
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName(BatchConstants.ARG_META_URL);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create(BatchConstants.ARG_META_URL);
        OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create(BatchConstants.ARG_OUTPUT);
        OptionBuilder.withArgName(BatchConstants.ARG_HIVE_TABLE);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table");
        OPTION_INPUT_TABLE = OptionBuilder.create(BatchConstants.ARG_HIVE_TABLE);
        OptionBuilder.withArgName(BatchConstants.ARG_INPUT);
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table PATH");
        OPTION_INPUT_PATH = OptionBuilder.create(BatchConstants.ARG_INPUT);
        OptionBuilder.withArgName("enableObjectReuse");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("Enable object reuse");
        OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.create("enableObjectReuse");
    }
}
