/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FactDistinctColumnsReducer
extends KylinReducer<Text, Text, NullWritable, Text> {
    private List<TblColRef> columnList;
    private String statisticsOutput = null;
    private List<Long> baseCuboidRowCountInMappers;
    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
    protected long baseCuboidId;
    protected CubeDesc cubeDesc;
    private long totalRowsBeforeMerge = 0L;
    private int samplingPercentage;
    private List<ByteArray> colValues;
    private TblColRef col = null;
    private boolean isStatistics = false;
    private boolean outputTouched = false;
    private KylinConfig cubeConfig;
    protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);

    protected void setup(Reducer.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
        this.cubeConfig = cube.getConfig();
        this.cubeDesc = cube.getDescriptor();
        this.columnList = CubeManager.getInstance(config).getAllDictColumnsOnFact(this.cubeDesc);
        boolean collectStatistics = Boolean.parseBoolean(conf.get("statistics.enabled"));
        int numberOfTasks = context.getNumReduceTasks();
        int taskId = context.getTaskAttemptID().getTaskID().getId();
        if (collectStatistics && taskId == numberOfTasks - 1) {
            this.isStatistics = true;
            this.statisticsOutput = conf.get("statistics.ouput");
            this.baseCuboidRowCountInMappers = Lists.newArrayList();
            this.cuboidHLLMap = Maps.newHashMap();
            this.samplingPercentage = Integer.parseInt(context.getConfiguration().get("statistics.sampling.percent"));
        } else {
            this.isStatistics = false;
            this.col = this.columnList.get(taskId);
            this.colValues = Lists.newArrayList();
        }
    }

    public void reduce(Text key, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
        if (!this.isStatistics) {
            this.colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
            if (this.colValues.size() == 1000000) {
                logger.info("spill values to disk...");
                this.outputDistinctValues(this.col, this.colValues, context);
                this.colValues.clear();
            }
        } else {
            long cuboidId = Bytes.toLong(key.getBytes(), 1, 8);
            for (Text value : values) {
                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(this.cubeConfig.getCubeStatsHLLPrecision());
                ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
                hll.readRegisters(bf);
                this.totalRowsBeforeMerge += hll.getCountEstimate();
                if (cuboidId == this.baseCuboidId) {
                    this.baseCuboidRowCountInMappers.add(hll.getCountEstimate());
                }
                if (this.cuboidHLLMap.get(cuboidId) != null) {
                    this.cuboidHLLMap.get(cuboidId).merge(hll);
                    continue;
                }
                this.cuboidHLLMap.put(cuboidId, hll);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Reducer.Context context) throws IOException {
        Configuration conf = context.getConfiguration();
        FileSystem fs = FileSystem.get((Configuration)conf);
        String outputPath = conf.get("output.path");
        Path outputFile = new Path(outputPath, col.getName());
        FSDataOutputStream out = null;
        try {
            if (fs.exists(outputFile)) {
                out = fs.append(outputFile);
                logger.info("append file " + outputFile);
            } else {
                out = fs.create(outputFile);
                logger.info("create file " + outputFile);
            }
            for (ByteArray value : values) {
                out.write(value.array(), value.offset(), value.length());
                out.write(10);
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeQuietly(out);
            this.outputTouched = true;
            throw throwable;
        }
        IOUtils.closeQuietly((OutputStream)out);
        this.outputTouched = true;
    }

    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
        if (!this.isStatistics) {
            if (!this.outputTouched || this.colValues.size() > 0) {
                this.outputDistinctValues(this.col, this.colValues, context);
                this.colValues.clear();
            }
        } else {
            long grandTotal = 0L;
            for (HyperLogLogPlusCounter hll : this.cuboidHLLMap.values()) {
                grandTotal += hll.getCountEstimate();
            }
            double mapperOverlapRatio = grandTotal == 0L ? 0.0 : (double)this.totalRowsBeforeMerge / (double)grandTotal;
            this.writeMapperAndCuboidStatistics(context);
            CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(this.statisticsOutput), this.cuboidHLLMap, this.samplingPercentage, mapperOverlapRatio);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeMapperAndCuboidStatistics(Reducer.Context context) throws IOException {
        Configuration conf = context.getConfiguration();
        FileSystem fs = FileSystem.get((Configuration)conf);
        FSDataOutputStream out = fs.create(new Path(this.statisticsOutput, "cube_statistics.txt"));
        try {
            ArrayList allCuboids = Lists.newArrayList();
            allCuboids.addAll(this.cuboidHLLMap.keySet());
            Collections.sort(allCuboids);
            String msg = "Total cuboid number: \t" + allCuboids.size();
            this.writeLine(out, msg);
            msg = "Samping percentage: \t" + this.samplingPercentage;
            this.writeLine(out, msg);
            this.writeLine(out, "The following statistics are collected based on sampling data.");
            for (int i = 0; i < this.baseCuboidRowCountInMappers.size(); ++i) {
                if (this.baseCuboidRowCountInMappers.get(i) <= 0L) continue;
                msg = "Base Cuboid in Mapper " + i + " row count: \t " + this.baseCuboidRowCountInMappers.get(i);
                this.writeLine(out, msg);
            }
            long grantTotal = 0L;
            Iterator i$ = allCuboids.iterator();
            while (i$.hasNext()) {
                long i = (Long)i$.next();
                grantTotal += this.cuboidHLLMap.get(i).getCountEstimate();
                msg = "Cuboid " + i + " row count is: \t " + this.cuboidHLLMap.get(i).getCountEstimate();
                this.writeLine(out, msg);
            }
            msg = "Sum of all the cube segments (before merge) is: \t " + this.totalRowsBeforeMerge;
            this.writeLine(out, msg);
            msg = "After merge, the cube has row count: \t " + grantTotal;
            this.writeLine(out, msg);
            if (grantTotal > 0L) {
                msg = "The mapper overlap ratio is: \t" + this.totalRowsBeforeMerge / grantTotal;
                this.writeLine(out, msg);
            }
        }
        finally {
            IOUtils.closeQuietly((OutputStream)out);
        }
    }

    private void writeLine(FSDataOutputStream out, String msg) throws IOException {
        out.write(msg.getBytes());
        out.write(10);
    }
}

