/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.cube.inmemcubing;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.CuboidResult;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoggedCubeBuilder
extends AbstractInMemCubeBuilder {
    private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
    private int unitRows = 1000;

    public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
        super(cuboidScheduler, flatDesc, dictionaryMap);
        if (this.cubeDesc.hasMemoryHungryMeasures()) {
            this.unitRows /= 10;
        }
    }

    @Override
    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) throws IOException {
        new BuildOnce().build(input, inputConverterUnit, output);
    }

    private static class MergeSlot
    implements Comparable<MergeSlot> {
        final Iterator<CuboidResult> cuboidIterator;
        IGTScanner scanner;
        Iterator<GTRecord> recordIterator;
        long currentCuboidId;
        GTRecord currentRecord;

        public MergeSlot(SplitThread split) {
            this.cuboidIterator = split.buildResult.values().iterator();
        }

        public boolean fetchNext() throws IOException {
            if (this.recordIterator == null) {
                if (this.cuboidIterator.hasNext()) {
                    CuboidResult cuboid = this.cuboidIterator.next();
                    this.currentCuboidId = cuboid.cuboidId;
                    this.scanner = cuboid.table.scan(new GTScanRequestBuilder().setInfo(cuboid.table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
                    this.recordIterator = this.scanner.iterator();
                } else {
                    return false;
                }
            }
            if (this.recordIterator.hasNext()) {
                this.currentRecord = this.recordIterator.next();
                return true;
            }
            this.scanner.close();
            this.recordIterator = null;
            return this.fetchNext();
        }

        @Override
        public int compareTo(MergeSlot o) {
            long cuboidComp = this.currentCuboidId - o.currentCuboidId;
            if (cuboidComp != 0L) {
                return cuboidComp < 0L ? -1 : 1;
            }
            ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
            for (int i = 0; i < pk.trueBitCount(); ++i) {
                int c = pk.trueBitAt(i);
                int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
                if (comp == 0) continue;
                return comp;
            }
            return 0;
        }

        public boolean isSameKey(MergeSlot o) {
            if (o == null) {
                return false;
            }
            return this.compareTo(o) == 0;
        }
    }

    private class Merger {
        MeasureAggregators reuseAggrs;
        Object[] reuseMetricsArray;
        ByteArray reuseMetricsSpace;
        long lastCuboidColumnCount;
        ImmutableBitSet lastMetricsColumns;

        Merger() {
            this.reuseAggrs = new MeasureAggregators(DoggedCubeBuilder.this.cubeDesc.getMeasures());
            this.reuseMetricsArray = new Object[DoggedCubeBuilder.this.cubeDesc.getMeasures().size()];
        }

        public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
            if (splits.size() == 1) {
                for (CuboidResult cuboidResult : splits.get((int)0).buildResult.values()) {
                    DoggedCubeBuilder.this.outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
                    cuboidResult.table.close();
                }
                return;
            }
            LinkedList open = Lists.newLinkedList();
            for (SplitThread split : splits) {
                open.add(new MergeSlot(split));
            }
            PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
            while (true) {
                if (!open.isEmpty()) {
                    MergeSlot slot = (MergeSlot)open.removeFirst();
                    if (!slot.fetchNext()) continue;
                    heap.add(slot);
                    continue;
                }
                MergeSlot smallest = (MergeSlot)heap.poll();
                if (smallest == null) break;
                open.add(smallest);
                if (smallest.isSameKey((MergeSlot)heap.peek())) {
                    Object[] metrics = this.getMetricsValues(smallest.currentRecord);
                    this.reuseAggrs.reset();
                    this.reuseAggrs.aggregate(metrics);
                    do {
                        MergeSlot slot = (MergeSlot)heap.poll();
                        open.add(slot);
                        metrics = this.getMetricsValues(slot.currentRecord);
                        this.reuseAggrs.aggregate(metrics);
                    } while (smallest.isSameKey((MergeSlot)heap.peek()));
                    this.reuseAggrs.collectStates(metrics);
                    this.setMetricsValues(smallest.currentRecord, metrics);
                }
                output.write(smallest.currentCuboidId, smallest.currentRecord);
            }
        }

        private void setMetricsValues(GTRecord record, Object[] metricsValues) {
            ImmutableBitSet metrics = this.getMetricsColumns(record);
            if (this.reuseMetricsSpace == null) {
                this.reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
            }
            record.setValues(metrics, this.reuseMetricsSpace, metricsValues);
        }

        private Object[] getMetricsValues(GTRecord record) {
            ImmutableBitSet metrics = this.getMetricsColumns(record);
            return record.getValues(metrics, this.reuseMetricsArray);
        }

        private ImmutableBitSet getMetricsColumns(GTRecord record) {
            if (this.lastCuboidColumnCount == (long)record.getInfo().getColumnCount()) {
                return this.lastMetricsColumns;
            }
            int to = record.getInfo().getColumnCount();
            int from = to - this.reuseMetricsArray.length;
            this.lastCuboidColumnCount = record.getInfo().getColumnCount();
            this.lastMetricsColumns = new ImmutableBitSet(from, to);
            return this.lastMetricsColumns;
        }
    }

    private class SplitThread
    extends Thread {
        final RecordConsumeBlockingQueueController<?> inputController;
        final InMemCubeBuilder builder;
        ConcurrentNavigableMap<Long, CuboidResult> buildResult;
        RuntimeException exception;

        public SplitThread(int num, RecordConsumeBlockingQueueController<?> inputController) {
            super("SplitThread" + num);
            this.inputController = inputController;
            this.builder = new InMemCubeBuilder(DoggedCubeBuilder.this.cuboidScheduler, DoggedCubeBuilder.this.flatDesc, DoggedCubeBuilder.this.dictionaryMap);
            this.builder.setConcurrentThreads(DoggedCubeBuilder.this.taskThreadCount);
            this.builder.setReserveMemoryMB(DoggedCubeBuilder.this.reserveMemoryMB);
        }

        @Override
        public void run() {
            try {
                this.buildResult = this.builder.build(this.inputController);
            }
            catch (Exception e) {
                this.exception = e instanceof RuntimeException ? (RuntimeException)e : new RuntimeException(e);
                this.inputController.findException();
            }
        }
    }

    private class BuildOnce {
        BuildOnce() {
        }

        public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) throws IOException {
            RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input, DoggedCubeBuilder.this.unitRows);
            ArrayList<SplitThread> splits = new ArrayList<SplitThread>();
            Merger merger = new Merger();
            long start = System.currentTimeMillis();
            logger.info("Dogged Cube Build start");
            try {
                while (!inputController.ifEnd()) {
                    SplitThread last = new SplitThread(splits.size() + 1, inputController);
                    splits.add(last);
                    last.start();
                    logger.info("Split #" + splits.size() + " kickoff");
                    last.join();
                    this.checkException(splits);
                }
                logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
                merger.mergeAndOutput(splits, output);
            }
            catch (Throwable e) {
                logger.error("Dogged Cube Build error", e);
                if (e instanceof Error) {
                    throw (Error)e;
                }
                if (e instanceof RuntimeException) {
                    throw (RuntimeException)e;
                }
                throw new IOException(e);
            }
            finally {
                output.close();
                this.closeGirdTables(splits);
                logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
                this.ensureExit(splits);
                logger.info("Dogged Cube Build return");
            }
        }

        private void closeGirdTables(List<SplitThread> splits) {
            for (SplitThread split : splits) {
                if (split.buildResult == null) continue;
                for (CuboidResult r : split.buildResult.values()) {
                    try {
                        r.table.close();
                    }
                    catch (Throwable e) {
                        logger.error("Error closing grid table " + r.table, e);
                    }
                }
            }
        }

        private void ensureExit(List<SplitThread> splits) throws IOException {
            try {
                for (int i = 0; i < splits.size(); ++i) {
                    SplitThread split = splits.get(i);
                    if (!split.isAlive()) continue;
                    this.abort(splits);
                }
            }
            catch (Throwable e) {
                logger.error("Dogged Cube Build error", e);
            }
        }

        private void checkException(List<SplitThread> splits) throws IOException {
            for (int i = 0; i < splits.size(); ++i) {
                SplitThread split = splits.get(i);
                if (split.exception == null) continue;
                this.abort(splits);
            }
        }

        private void abort(List<SplitThread> splits) throws IOException {
            for (SplitThread splitThread : splits) {
                splitThread.builder.abort();
            }
            ArrayList<Exception> errors = new ArrayList<Exception>();
            for (SplitThread splitThread : splits) {
                try {
                    splitThread.join();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    errors.add(e);
                }
                if (splitThread.exception == null) continue;
                errors.add(splitThread.exception);
            }
            if (errors.isEmpty()) {
                return;
            }
            if (errors.size() == 1) {
                Throwable throwable = (Throwable)errors.get(0);
                if (throwable instanceof IOException) {
                    throw (IOException)throwable;
                }
                throw new IOException(throwable);
            }
            for (Throwable throwable : errors) {
                logger.error("Exception during in-mem cube build", throwable);
            }
            throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", (Throwable)errors.get(0));
        }
    }
}

