/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.gridtable;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.SortedMap;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.BufferedMeasureEncoder;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.metadata.datatype.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GTAggregateScanner
implements IGTScanner {
    private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
    final GTInfo info;
    final ImmutableBitSet dimensions;
    final ImmutableBitSet groupBy;
    final ImmutableBitSet metrics;
    final String[] metricsAggrFuncs;
    final IGTScanner inputScanner;
    final AggregationCache aggrCache;
    final long spillThreshold;
    final int storagePushDownLimit;
    final long deadline;
    private int aggregatedRowCount = 0;
    private MemoryBudgetController.MemoryWaterLevel memTracker;
    private boolean[] aggrMask;

    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline) {
        if (!req.hasAggregation()) {
            throw new IllegalStateException();
        }
        this.info = inputScanner.getInfo();
        this.dimensions = req.getDimensions();
        this.groupBy = req.getAggrGroupBy();
        this.metrics = req.getAggrMetrics();
        this.metricsAggrFuncs = req.getAggrMetricsFuncs();
        this.inputScanner = inputScanner;
        this.aggrCache = new AggregationCache();
        this.spillThreshold = (long)(req.getAggCacheMemThreshold() * 1.073741824E9);
        this.aggrMask = new boolean[this.metricsAggrFuncs.length];
        this.storagePushDownLimit = req.getStoragePushDownLimit();
        this.deadline = deadline;
        Arrays.fill(this.aggrMask, true);
    }

    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
        return (GTAggregateScanner.estimateSizeOf(keySample) + GTAggregateScanner.estimateSizeOf(aggrSample) + 64L) * (long)size;
    }

    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
        long est = (aggrs.length + 1) / 2 * 8 + 4 + 4;
        for (MeasureAggregator aggr : aggrs) {
            if (aggr == null) continue;
            est += (long)aggr.getMemBytesEstimate();
        }
        return est;
    }

    public static long estimateSizeOf(byte[] bytes) {
        return (bytes.length + 7) / 8 * 8 + 4 + 4;
    }

    public void trackMemoryLevel(MemoryBudgetController.MemoryWaterLevel tracker) {
        this.memTracker = tracker;
    }

    @Override
    public GTInfo getInfo() {
        return this.info;
    }

    @Override
    public long getScannedRowCount() {
        return this.inputScanner.getScannedRowCount();
    }

    @Override
    public void close() throws IOException {
        this.inputScanner.close();
        this.aggrCache.close();
    }

    @Override
    public Iterator<GTRecord> iterator() {
        long count = 0L;
        for (GTRecord r : this.inputScanner) {
            if (count % 100L == 1L && System.currentTimeMillis() > this.deadline) {
                throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
            }
            if (this.getNumOfSpills() == 0) {
                boolean ret = this.aggrCache.aggregate(r, this.storagePushDownLimit);
                if (!ret) {
                    logger.info("abort reading inputScanner because storage push down limit is hit");
                    break;
                }
            } else {
                this.aggrCache.aggregate(r, Integer.MAX_VALUE);
            }
            ++count;
        }
        logger.info("GTAggregateScanner input rows: " + count);
        return this.aggrCache.iterator();
    }

    public int getNumOfSpills() {
        return this.aggrCache.dumps.size();
    }

    public void setAggrMask(boolean[] aggrMask) {
        this.aggrMask = aggrMask;
    }

    public long getEstimateSizeOfAggrCache() {
        return this.aggrCache.estimatedMemSize();
    }

    class AggregationCache
    implements Closeable {
        final List<Dump> dumps;
        final int keyLength;
        final boolean[] compareMask;
        final BufferedMeasureEncoder measureCodec;
        final Comparator<byte[]> bytesComparator = new Comparator<byte[]>(){

            @Override
            public int compare(byte[] o1, byte[] o2) {
                int result = 0;
                for (int i = 0; i < AggregationCache.this.keyLength; ++i) {
                    int b;
                    int a;
                    if (!AggregationCache.this.compareMask[i] || (result = (a = o1[i] & 0xFF) - (b = o2[i] & 0xFF)) == 0) continue;
                    return result;
                }
                return result;
            }
        };
        SortedMap<byte[], MeasureAggregator[]> aggBufMap;

        public AggregationCache() {
            this.compareMask = this.createCompareMask();
            this.keyLength = this.compareMask.length;
            this.dumps = Lists.newArrayList();
            this.aggBufMap = this.createBuffMap();
            this.measureCodec = this.createMeasureCodec();
        }

        private BufferedMeasureEncoder createMeasureCodec() {
            DataType[] types = new DataType[GTAggregateScanner.this.metrics.trueBitCount()];
            for (int i = 0; i < types.length; ++i) {
                types[i] = GTAggregateScanner.this.info.getColumnType(GTAggregateScanner.this.metrics.trueBitAt(i));
            }
            BufferedMeasureEncoder result = new BufferedMeasureEncoder(types);
            result.setBufferSize(GTAggregateScanner.this.info.getMaxColumnLength(GTAggregateScanner.this.metrics));
            return result;
        }

        private boolean[] createCompareMask() {
            int keyLength = 0;
            for (int i = 0; i < GTAggregateScanner.this.dimensions.trueBitCount(); ++i) {
                int c = GTAggregateScanner.this.dimensions.trueBitAt(i);
                int l = GTAggregateScanner.this.info.codeSystem.maxCodeLength(c);
                keyLength += l;
            }
            boolean[] mask = new boolean[keyLength];
            int p = 0;
            for (int i = 0; i < GTAggregateScanner.this.dimensions.trueBitCount(); ++i) {
                int c = GTAggregateScanner.this.dimensions.trueBitAt(i);
                int l = GTAggregateScanner.this.info.codeSystem.maxCodeLength(c);
                boolean m = GTAggregateScanner.this.groupBy.get(c);
                for (int j = 0; j < l; ++j) {
                    mask[p++] = m;
                }
            }
            return mask;
        }

        private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
            return Maps.newTreeMap(this.bytesComparator);
        }

        private byte[] createKey(GTRecord record) {
            byte[] result = new byte[this.keyLength];
            int offset = 0;
            for (int i = 0; i < GTAggregateScanner.this.dimensions.trueBitCount(); ++i) {
                int c = GTAggregateScanner.this.dimensions.trueBitAt(i);
                ByteArray byteArray = record.cols[c];
                int columnLength = GTAggregateScanner.this.info.codeSystem.maxCodeLength(c);
                System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
                offset += columnLength;
            }
            assert (offset == result.length);
            return result;
        }

        boolean aggregate(GTRecord r, int stopForLimit) {
            byte[] key;
            MeasureAggregator[] aggrs;
            if (++GTAggregateScanner.this.aggregatedRowCount % 100000 == 0) {
                if (GTAggregateScanner.this.memTracker != null) {
                    GTAggregateScanner.this.memTracker.markHigh();
                }
                if (GTAggregateScanner.this.spillThreshold > 0L && this.estimatedMemSize() > GTAggregateScanner.this.spillThreshold) {
                    this.spillBuffMap();
                }
            }
            if ((aggrs = (MeasureAggregator[])this.aggBufMap.get(key = this.createKey(r))) == null) {
                if (this.aggBufMap.size() >= stopForLimit) {
                    return false;
                }
                aggrs = this.newAggregators();
                this.aggBufMap.put(key, aggrs);
            }
            for (int i = 0; i < aggrs.length; ++i) {
                if (!GTAggregateScanner.this.aggrMask[i]) continue;
                int col = GTAggregateScanner.this.metrics.trueBitAt(i);
                Object metrics = GTAggregateScanner.this.info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
                aggrs[i].aggregate(metrics);
            }
            return true;
        }

        private void spillBuffMap() throws RuntimeException {
            if (this.aggBufMap.isEmpty()) {
                return;
            }
            try {
                Dump dump = new Dump(this.aggBufMap);
                dump.flush();
                this.dumps.add(dump);
                this.aggBufMap = this.createBuffMap();
            }
            catch (Exception e) {
                throw new RuntimeException("AggregationCache spill failed: " + e.getMessage());
            }
        }

        @Override
        public void close() throws RuntimeException {
            try {
                for (Dump dump : this.dumps) {
                    dump.terminate();
                }
            }
            catch (Exception e) {
                throw new RuntimeException("AggregationCache close failed: " + e.getMessage());
            }
        }

        private MeasureAggregator[] newAggregators() {
            return GTAggregateScanner.this.info.codeSystem.newMetricsAggregators(GTAggregateScanner.this.metrics, GTAggregateScanner.this.metricsAggrFuncs);
        }

        public long estimatedMemSize() {
            if (this.aggBufMap.isEmpty()) {
                return 0L;
            }
            byte[] sampleKey = this.aggBufMap.firstKey();
            MeasureAggregator[] sampleValue = (MeasureAggregator[])this.aggBufMap.get(sampleKey);
            return GTAggregateScanner.estimateSizeOfAggrCache(sampleKey, sampleValue, this.aggBufMap.size());
        }

        public Iterator<GTRecord> iterator() {
            if (this.dumps.isEmpty()) {
                return new Iterator<GTRecord>(){
                    final Iterator<Map.Entry<byte[], MeasureAggregator[]>> it;
                    final ReturningRecord returningRecord;
                    {
                        this.it = AggregationCache.this.aggBufMap.entrySet().iterator();
                        this.returningRecord = new ReturningRecord();
                    }

                    @Override
                    public boolean hasNext() {
                        return this.it.hasNext();
                    }

                    @Override
                    public GTRecord next() {
                        Map.Entry<byte[], MeasureAggregator[]> entry = this.it.next();
                        this.returningRecord.load(entry.getKey(), entry.getValue());
                        return this.returningRecord.record;
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
            logger.info("Last spill, current AggregationCache memory estimated size is: " + GTAggregateScanner.this.getEstimateSizeOfAggrCache());
            this.spillBuffMap();
            return new Iterator<GTRecord>(){
                final DumpMerger merger;
                final Iterator<Pair<byte[], MeasureAggregator[]>> it;
                final ReturningRecord returningRecord;
                {
                    this.merger = new DumpMerger(AggregationCache.this.dumps);
                    this.it = this.merger.iterator();
                    this.returningRecord = new ReturningRecord();
                }

                @Override
                public boolean hasNext() {
                    return this.it.hasNext();
                }

                @Override
                public GTRecord next() {
                    Pair<byte[], MeasureAggregator[]> entry = this.it.next();
                    this.returningRecord.load(entry.getKey(), entry.getValue());
                    return this.returningRecord.record;
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        class DumpMerger
        implements Iterable<Pair<byte[], MeasureAggregator[]>> {
            final PriorityQueue<Pair<byte[], Integer>> minHeap;
            final List<Iterator<Pair<byte[], byte[]>>> dumpIterators;
            final List<Object[]> dumpCurrentValues;
            final MeasureAggregator[] resultMeasureAggregators;
            final MeasureAggregators resultAggrs;

            public DumpMerger(List<Dump> dumps) {
                this.resultMeasureAggregators = AggregationCache.this.newAggregators();
                this.resultAggrs = new MeasureAggregators(this.resultMeasureAggregators);
                this.minHeap = new PriorityQueue<Pair<byte[], Integer>>(dumps.size(), new Comparator<Pair<byte[], Integer>>(){

                    @Override
                    public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) {
                        return AggregationCache.this.bytesComparator.compare(o1.getFirst(), o2.getFirst());
                    }
                });
                this.dumpIterators = Lists.newArrayListWithCapacity((int)dumps.size());
                this.dumpCurrentValues = Lists.newArrayListWithCapacity((int)dumps.size());
                for (int i = 0; i < dumps.size(); ++i) {
                    Iterator<Pair<byte[], byte[]>> it = dumps.get(i).iterator();
                    this.dumpCurrentValues.add(i, null);
                    if (it.hasNext()) {
                        this.dumpIterators.add(i, it);
                        this.enqueueFromDump(i);
                        continue;
                    }
                    this.dumpIterators.add(i, null);
                }
            }

            private void enqueueFromDump(int index) {
                if (this.dumpIterators.get(index) != null && this.dumpIterators.get(index).hasNext()) {
                    Pair<byte[], byte[]> pair = this.dumpIterators.get(index).next();
                    this.minHeap.offer(new Pair<byte[], Integer>(pair.getKey(), index));
                    Object[] metricValues = new Object[GTAggregateScanner.this.metrics.trueBitCount()];
                    AggregationCache.this.measureCodec.decode(ByteBuffer.wrap(pair.getValue()), metricValues);
                    this.dumpCurrentValues.set(index, metricValues);
                }
            }

            @Override
            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
                return new Iterator<Pair<byte[], MeasureAggregator[]>>(){

                    @Override
                    public boolean hasNext() {
                        return !DumpMerger.this.minHeap.isEmpty();
                    }

                    private void internalAggregate() {
                        Pair<byte[], Integer> peekEntry = DumpMerger.this.minHeap.poll();
                        DumpMerger.this.resultAggrs.aggregate(DumpMerger.this.dumpCurrentValues.get(peekEntry.getValue()));
                        DumpMerger.this.enqueueFromDump(peekEntry.getValue());
                    }

                    @Override
                    public Pair<byte[], MeasureAggregator[]> next() {
                        DumpMerger.this.resultAggrs.reset();
                        byte[] peekKey = DumpMerger.this.minHeap.peek().getKey();
                        this.internalAggregate();
                        while (!DumpMerger.this.minHeap.isEmpty() && AggregationCache.this.bytesComparator.compare(peekKey, DumpMerger.this.minHeap.peek().getKey()) == 0) {
                            this.internalAggregate();
                        }
                        return new Pair<byte[], MeasureAggregator[]>(peekKey, DumpMerger.this.resultMeasureAggregators);
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        }

        class Dump
        implements Iterable<Pair<byte[], byte[]>> {
            File dumpedFile;
            DataInputStream dis;
            SortedMap<byte[], MeasureAggregator[]> buffMap;

            public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException {
                this.buffMap = buffMap;
            }

            @Override
            public Iterator<Pair<byte[], byte[]>> iterator() {
                try {
                    if (this.dumpedFile == null || !this.dumpedFile.exists()) {
                        throw new RuntimeException("Dumped file cannot be found at: " + (this.dumpedFile == null ? "<null>" : this.dumpedFile.getAbsolutePath()));
                    }
                    this.dis = new DataInputStream(new FileInputStream(this.dumpedFile));
                    final int count = this.dis.readInt();
                    return new Iterator<Pair<byte[], byte[]>>(){
                        int cursorIdx = 0;

                        @Override
                        public boolean hasNext() {
                            return this.cursorIdx < count;
                        }

                        @Override
                        public Pair<byte[], byte[]> next() {
                            try {
                                ++this.cursorIdx;
                                int keyLen = Dump.this.dis.readInt();
                                byte[] key = new byte[keyLen];
                                Dump.this.dis.read(key);
                                int valueLen = Dump.this.dis.readInt();
                                byte[] value = new byte[valueLen];
                                Dump.this.dis.read(value);
                                return new Pair<byte[], byte[]>(key, value);
                            }
                            catch (Exception e) {
                                throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage());
                            }
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to read dumped file: " + e.getMessage());
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void flush() throws IOException {
                if (this.buffMap != null) {
                    DataOutputStream dos = null;
                    Object[] aggrResult = null;
                    try {
                        this.dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
                        logger.info("AggregationCache will dump to file: " + this.dumpedFile.getAbsolutePath());
                        dos = new DataOutputStream(new FileOutputStream(this.dumpedFile));
                        dos.writeInt(this.buffMap.size());
                        for (Map.Entry<byte[], MeasureAggregator[]> entry : this.buffMap.entrySet()) {
                            MeasureAggregators aggs = new MeasureAggregators(entry.getValue());
                            aggrResult = new Object[GTAggregateScanner.this.metrics.trueBitCount()];
                            aggs.collectStates(aggrResult);
                            ByteBuffer metricsBuf = AggregationCache.this.measureCodec.encode(aggrResult);
                            dos.writeInt(entry.getKey().length);
                            dos.write(entry.getKey());
                            dos.writeInt(metricsBuf.position());
                            dos.write(metricsBuf.array(), 0, metricsBuf.position());
                        }
                        this.buffMap = null;
                    }
                    catch (Throwable throwable) {
                        this.buffMap = null;
                        IOUtils.closeQuietly(dos);
                        throw throwable;
                    }
                    IOUtils.closeQuietly((OutputStream)dos);
                }
            }

            public void terminate() throws IOException {
                this.buffMap = null;
                if (this.dis != null) {
                    this.dis.close();
                }
                if (this.dumpedFile != null && this.dumpedFile.exists()) {
                    this.dumpedFile.delete();
                }
            }
        }

        class ReturningRecord {
            final GTRecord record;
            final Object[] tmpValues;

            ReturningRecord() {
                this.record = new GTRecord(GTAggregateScanner.this.info);
                this.tmpValues = new Object[GTAggregateScanner.this.metrics.trueBitCount()];
            }

            void load(byte[] key, MeasureAggregator[] value) {
                int i;
                int offset = 0;
                for (i = 0; i < GTAggregateScanner.this.dimensions.trueBitCount(); ++i) {
                    int c = GTAggregateScanner.this.dimensions.trueBitAt(i);
                    int columnLength = GTAggregateScanner.this.info.codeSystem.maxCodeLength(c);
                    this.record.cols[c].set(key, offset, columnLength);
                    offset += columnLength;
                }
                for (i = 0; i < value.length; ++i) {
                    this.tmpValues[i] = value[i].getState();
                }
                byte[] bytes = AggregationCache.this.measureCodec.encode(this.tmpValues).array();
                int[] sizes = AggregationCache.this.measureCodec.getMeasureSizes();
                offset = 0;
                for (int i2 = 0; i2 < value.length; ++i2) {
                    int col = GTAggregateScanner.this.metrics.trueBitAt(i2);
                    this.record.cols[col].set(bytes, offset, sizes[i2]);
                    offset += sizes[i2];
                }
            }
        }
    }
}

