/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.cube.inmemcubing;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
import org.apache.kylin.cube.inmemcubing.CuboidResult;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderInputConverter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderUtils;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.topn.Counter;
import org.apache.kylin.measure.topn.TopNCounter;
import org.apache.kylin.metadata.datatype.DoubleMutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemCubeBuilder
extends AbstractInMemCubeBuilder {
    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
    private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
    private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
    private final CuboidScheduler cuboidScheduler;
    private final long baseCuboidId;
    private final int totalCuboidCount;
    private final String[] metricsAggrFuncs;
    private final MeasureDesc[] measureDescs;
    private final int measureCount;
    private MemoryBudgetController memBudget;
    private MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker;
    private Thread[] taskThreads;
    private Throwable[] taskThreadExceptions;
    private TreeSet<CuboidTask> taskPending;
    private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
    private CuboidResult baseResult;
    private Object[] totalSumForSanityCheck;
    private ICuboidCollector resultCollector;

    public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
        super(cubeDesc, flatDesc, dictionaryMap);
        this.cuboidScheduler = new CuboidScheduler(cubeDesc);
        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
        this.totalCuboidCount = this.cuboidScheduler.getCuboidCount();
        this.measureCount = cubeDesc.getMeasures().size();
        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[this.measureCount]);
        ArrayList metricsAggrFuncsList = Lists.newArrayList();
        for (int i = 0; i < this.measureCount; ++i) {
            MeasureDesc measureDesc = this.measureDescs[i];
            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
        }
        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
    }

    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
        GTInfo info = CubeGridTable.newGTInfo(this.cubeDesc, cuboidID, this.dictionaryMap);
        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
        GridTable gridTable = new GridTable(info, store);
        return gridTable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
        ConcurrentNavigableMap<Long, CuboidResult> result = this.build(input);
        try {
            for (CuboidResult cuboidResult : result.values()) {
                this.outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
                cuboidResult.table.close();
            }
        }
        finally {
            output.close();
        }
    }

    public ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
        final ConcurrentSkipListMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
        this.build(input, new ICuboidCollector(){

            @Override
            public void collect(CuboidResult cuboidResult) {
                logger.info("collecting CuboidResult cuboid id:" + cuboidResult.cuboidId);
                result.put(cuboidResult.cuboidId, cuboidResult);
            }
        });
        logger.info("total CuboidResult count:" + result.size());
        return result;
    }

    private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
        long startTime = System.currentTimeMillis();
        logger.info("In Mem Cube Build start, " + this.cubeDesc.getName());
        this.baseCuboidMemTracker = new MemoryBudgetController.MemoryWaterLevel();
        this.baseCuboidMemTracker.markLow();
        this.taskPending = new TreeSet();
        this.taskCuboidCompleted.set(0);
        this.taskThreads = this.prepareTaskThreads();
        this.taskThreadExceptions = new Throwable[this.taskThreadCount];
        this.resultCollector = collector;
        this.totalSumForSanityCheck = null;
        this.baseResult = this.createBaseCuboid(input);
        if (this.baseResult.nRows == 0) {
            return;
        }
        this.baseCuboidMemTracker.markLow();
        this.makeMemoryBudget();
        this.addChildTasks(this.baseResult);
        this.start(this.taskThreads);
        this.join(this.taskThreads);
        long endTime = System.currentTimeMillis();
        logger.info("In Mem Cube Build end, " + this.cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
        this.throwExceptionIfAny();
    }

    public void abort() {
        this.interrupt(this.taskThreads);
    }

    private void start(Thread ... threads) {
        for (Thread t : threads) {
            t.start();
        }
    }

    private void interrupt(Thread ... threads) {
        for (Thread t : threads) {
            t.interrupt();
        }
    }

    private void join(Thread ... threads) throws IOException {
        try {
            for (Thread t : threads) {
                t.join();
            }
        }
        catch (InterruptedException e) {
            throw new IOException("interrupted while waiting task and output complete", e);
        }
    }

    private void throwExceptionIfAny() throws IOException {
        ArrayList<Throwable> errors = new ArrayList<Throwable>();
        for (int i = 0; i < this.taskThreadCount; ++i) {
            Throwable t = this.taskThreadExceptions[i];
            if (t == null) continue;
            errors.add(t);
        }
        if (errors.isEmpty()) {
            return;
        }
        if (errors.size() == 1) {
            Throwable t = (Throwable)errors.get(0);
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException(t);
        }
        for (Throwable t : errors) {
            logger.error("Exception during in-mem cube build", t);
        }
        throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", (Throwable)errors.get(0));
    }

    private Thread[] prepareTaskThreads() {
        Thread[] result = new Thread[this.taskThreadCount];
        for (int i = 0; i < this.taskThreadCount; ++i) {
            result[i] = new CuboidTaskThread(i);
        }
        return result;
    }

    public boolean isAllCuboidDone() {
        return this.taskCuboidCompleted.get() == this.totalCuboidCount;
    }

    private boolean taskHasNoException() {
        for (int i = 0; i < this.taskThreadExceptions.length; ++i) {
            if (this.taskThreadExceptions[i] == null) continue;
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addChildTasks(CuboidResult parent) {
        List<Long> children = this.cuboidScheduler.getSpanningCuboid(parent.cuboidId);
        if (!children.isEmpty()) {
            TreeSet<CuboidTask> treeSet = this.taskPending;
            synchronized (treeSet) {
                for (Long child : children) {
                    this.taskPending.add(new CuboidTask(parent, child));
                }
                this.taskPending.notifyAll();
            }
        }
    }

    private void makeMemoryBudget() {
        this.baseResult.aggrCacheMB = Math.max(this.baseCuboidMemTracker.getEstimateMB(), 10);
        logger.debug("Base cuboid aggr cache is " + this.baseResult.aggrCacheMB + " MB");
        int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB();
        logger.debug("System avail " + systemAvailMB + " MB");
        int reserve = this.reserveMemoryMB;
        logger.debug("Reserve " + reserve + " MB for system basics");
        int budget = systemAvailMB - reserve;
        if (budget < this.baseResult.aggrCacheMB) {
            budget = this.baseResult.aggrCacheMB;
            logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + this.baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
        }
        logger.debug("Memory Budget is " + budget + " MB");
        this.memBudget = new MemoryBudgetController(budget);
    }

    private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
        long startTime = System.currentTimeMillis();
        logger.info("Calculating base cuboid " + this.baseCuboidId);
        GridTable baseCuboid = this.newGridTableByCuboidID(this.baseCuboidId);
        GTBuilder baseBuilder = baseCuboid.rebuild();
        InputConverter baseInput = new InputConverter(baseCuboid.getInfo(), input);
        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(this.baseCuboidId, this.measureCount);
        GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()).setAggrMetricsFuncs(this.metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, Long.MAX_VALUE);
        aggregationScanner.trackMemoryLevel(this.baseCuboidMemTracker);
        int count = 0;
        for (GTRecord r : aggregationScanner) {
            if (count == 0) {
                this.baseCuboidMemTracker.markHigh();
            }
            baseBuilder.write(r);
            ++count;
        }
        aggregationScanner.close();
        baseBuilder.close();
        long timeSpent = System.currentTimeMillis() - startTime;
        logger.info("Cuboid " + this.baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
        int mbEstimateBaseAggrCache = (int)(aggregationScanner.getEstimateSizeOfAggrCache() / 0x100000L);
        logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
        return this.updateCuboidResult(this.baseCuboidId, baseCuboid, count, timeSpent, 0);
    }

    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
        if (aggrCacheMB <= 0 && this.baseResult != null) {
            aggrCacheMB = (int)Math.round((0.1 + 0.9 * (double)nRows / (double)this.baseResult.nRows) * (double)this.baseResult.aggrCacheMB);
        }
        CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
        this.taskCuboidCompleted.incrementAndGet();
        this.resultCollector.collect(result);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
        final String consumerName = "AggrCache@Cuboid " + cuboidId;
        MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer(){

            @Override
            public int freeUp(int mb) {
                return 0;
            }

            public String toString() {
                return consumerName;
            }
        };
        this.memBudget.reserveInsist(consumer, parent.aggrCacheMB);
        try {
            CuboidResult cuboidResult = this.aggregateCuboid(parent, cuboidId);
            return cuboidResult;
        }
        finally {
            this.memBudget.reserve(consumer, 0);
        }
    }

    private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
        Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, this.measureCount);
        return this.scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
    }

    private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
        GTInfo info = gridTable.getInfo();
        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(this.metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
        GTAggregateScanner scanner = (GTAggregateScanner)gridTable.scan(req);
        if (parentId != cuboidId) {
            boolean[] aggrMask = new boolean[this.measureDescs.length];
            for (int i = 0; i < this.measureDescs.length; ++i) {
                boolean bl = aggrMask[i] = !this.measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
                if (aggrMask[i]) continue;
                logger.info(this.measureDescs[i].toString() + " doesn't need aggregation.");
            }
            scanner.setAggrMask(aggrMask);
        }
        return scanner;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
        long startTime = System.currentTimeMillis();
        logger.info("Calculating cuboid " + cuboidId);
        GTAggregateScanner scanner = this.prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, measureColumns);
        GridTable newGridTable = this.newGridTableByCuboidID(cuboidId);
        GTBuilder builder = newGridTable.rebuild();
        ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
        int count = 0;
        try {
            for (GTRecord record : scanner) {
                ++count;
                for (int i = 0; i < allNeededColumns.trueBitCount(); ++i) {
                    int c = allNeededColumns.trueBitAt(i);
                    newRecord.set(i, record.get(c));
                }
                builder.write(newRecord);
            }
        }
        finally {
            scanner.close();
            builder.close();
        }
        long timeSpent = System.currentTimeMillis() - startTime;
        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
        return this.updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
    }

    private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) {
        for (int i = 0; i < totalSum.length; ++i) {
            if (totalSum[i] instanceof DoubleMutable) {
                totalSum[i] = Math.round(((DoubleMutable)totalSum[i]).get());
                continue;
            }
            if (!(totalSum[i] instanceof TopNCounter)) continue;
            TopNCounter counter = (TopNCounter)totalSum[i];
            Iterator iterator = counter.iterator();
            double total = 0.0;
            while (iterator.hasNext()) {
                Counter aCounter = iterator.next();
                total += aCounter.getCount();
            }
            totalSum[i] = Math.round(total);
        }
        if (this.totalSumForSanityCheck == null) {
            this.totalSumForSanityCheck = totalSum;
            return;
        }
        if (!Arrays.equals(this.totalSumForSanityCheck, totalSum)) {
            logger.info("sanityCheck failed when calculate " + cuboidId + " from parent " + parentId);
            logger.info("Expected: " + Arrays.toString(this.totalSumForSanityCheck));
            logger.info("Actually: " + Arrays.toString(totalSum));
            throw new IllegalStateException();
        }
    }

    private class InputConverter
    implements IGTScanner {
        GTInfo info;
        GTRecord record;
        BlockingQueue<List<String>> input;
        final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter;

        public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
            this.info = info;
            this.input = input;
            this.record = new GTRecord(info);
            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(InMemCubeBuilder.this.cubeDesc, InMemCubeBuilder.this.flatDesc, InMemCubeBuilder.this.dictionaryMap, info);
        }

        @Override
        public Iterator<GTRecord> iterator() {
            return new Iterator<GTRecord>(){
                List<String> currentObject = null;

                @Override
                public boolean hasNext() {
                    try {
                        this.currentObject = InputConverter.this.input.take();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return this.currentObject != null && this.currentObject.size() > 0;
                }

                @Override
                public GTRecord next() {
                    if (this.currentObject.size() == 0) {
                        throw new IllegalStateException();
                    }
                    InputConverter.this.inMemCubeBuilderInputConverter.convert(this.currentObject, InputConverter.this.record);
                    return InputConverter.this.record;
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public GTInfo getInfo() {
            return this.info;
        }

        @Override
        public long getScannedRowCount() {
            return 0L;
        }
    }

    private static class CuboidTask
    implements Comparable<CuboidTask> {
        final CuboidResult parent;
        final long childCuboidId;

        CuboidTask(CuboidResult parent, long childCuboidId) {
            this.parent = parent;
            this.childCuboidId = childCuboidId;
        }

        @Override
        public int compareTo(CuboidTask o) {
            long comp = this.childCuboidId - o.childCuboidId;
            return comp < 0L ? -1 : (comp > 0L ? 1 : 0);
        }
    }

    private class CuboidTaskThread
    extends Thread {
        private int id;

        CuboidTaskThread(int id) {
            super("CuboidTask-" + id);
            this.id = id;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block9: {
                try {
                    while (!InMemCubeBuilder.this.isAllCuboidDone()) {
                        CuboidTask task = null;
                        TreeSet treeSet = InMemCubeBuilder.this.taskPending;
                        synchronized (treeSet) {
                            while (task == null && InMemCubeBuilder.this.taskHasNoException()) {
                                task = (CuboidTask)InMemCubeBuilder.this.taskPending.pollFirst();
                                if (task != null) continue;
                                InMemCubeBuilder.this.taskPending.wait(60000L);
                            }
                        }
                        if (task != null) {
                            CuboidResult newCuboid = InMemCubeBuilder.this.buildCuboid(task.parent, task.childCuboidId);
                            InMemCubeBuilder.this.addChildTasks(newCuboid);
                            if (!InMemCubeBuilder.this.isAllCuboidDone()) continue;
                            for (Thread t : InMemCubeBuilder.this.taskThreads) {
                                if (t == Thread.currentThread()) continue;
                                t.interrupt();
                            }
                            continue;
                        }
                        break;
                    }
                }
                catch (Throwable ex) {
                    if (InMemCubeBuilder.this.isAllCuboidDone()) break block9;
                    logger.error("task thread exception", ex);
                    ((InMemCubeBuilder)InMemCubeBuilder.this).taskThreadExceptions[this.id] = ex;
                }
            }
        }
    }

    static interface ICuboidCollector {
        public void collect(CuboidResult var1);
    }
}

