/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.measure.topn;

import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureTypeFactory;
import org.apache.kylin.measure.topn.Counter;
import org.apache.kylin.measure.topn.TopNAggregator;
import org.apache.kylin.measure.topn.TopNCounter;
import org.apache.kylin.measure.topn.TopNCounterSerializer;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopNMeasureType
extends MeasureType<TopNCounter<ByteArray>> {
    private static final Logger logger = LoggerFactory.getLogger(TopNMeasureType.class);
    public static final String FUNC_TOP_N = "TOP_N";
    public static final String DATATYPE_TOPN = "topn";
    private final DataType dataType;

    public TopNMeasureType(String funcName, DataType dataType) {
        this.dataType = dataType;
    }

    @Override
    public void validate(FunctionDesc functionDesc) throws IllegalArgumentException {
        this.validate(functionDesc.getExpression(), functionDesc.getReturnDataType(), true);
    }

    private void validate(String funcName, DataType dataType, boolean checkDataType) {
        if (!FUNC_TOP_N.equals(funcName)) {
            throw new IllegalArgumentException();
        }
        if (!DATATYPE_TOPN.equals(dataType.getName())) {
            throw new IllegalArgumentException();
        }
        if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000) {
            throw new IllegalArgumentException();
        }
    }

    @Override
    public boolean isMemoryHungry() {
        return true;
    }

    @Override
    public MeasureIngester<TopNCounter<ByteArray>> newIngester() {
        return new MeasureIngester<TopNCounter<ByteArray>>(){

            @Override
            public TopNCounter<ByteArray> valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
                if (values.length != 2) {
                    throw new IllegalArgumentException();
                }
                double counter = values[0] == null ? 0.0 : Double.parseDouble(values[0]);
                String literal = values[1];
                TblColRef literalCol = TopNMeasureType.this.getTopNLiteralColumn(measureDesc.getFunction());
                Dictionary<String> dictionary = dictionaryMap.get(literalCol);
                int keyEncodedValue = dictionary.getIdFromValue(literal);
                ByteArray key = new ByteArray(dictionary.getSizeOfId());
                BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, dictionary.getSizeOfId());
                TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(TopNMeasureType.this.dataType.getPrecision() * 50);
                topNCounter.offer(key, counter);
                return topNCounter;
            }

            @Override
            public TopNCounter<ByteArray> reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
                TopNCounter<ByteArray> topNCounter = value;
                TblColRef colRef = TopNMeasureType.this.getTopNLiteralColumn(measureDesc.getFunction());
                Dictionary<String> sourceDict = oldDicts.get(colRef);
                Dictionary<String> mergedDict = newDicts.get(colRef);
                int topNSize = topNCounter.size();
                byte[] newIdBuf = new byte[topNSize * mergedDict.getSizeOfId()];
                byte[] literal = new byte[sourceDict.getSizeOfValue()];
                int bufOffset = 0;
                for (Counter<ByteArray> counter : topNCounter) {
                    int oldId = BytesUtil.readUnsigned(counter.getItem().array(), counter.getItem().offset(), counter.getItem().length());
                    int size = sourceDict.getValueBytesFromId(oldId, literal, 0);
                    int newId = size < 0 ? mergedDict.nullId() : mergedDict.getIdFromValueBytes(literal, 0, size);
                    BytesUtil.writeUnsigned(newId, newIdBuf, bufOffset, mergedDict.getSizeOfId());
                    counter.getItem().set(newIdBuf, bufOffset, mergedDict.getSizeOfId());
                    bufOffset += mergedDict.getSizeOfId();
                }
                return value;
            }
        };
    }

    @Override
    public MeasureAggregator<TopNCounter<ByteArray>> newAggregator() {
        return new TopNAggregator();
    }

    @Override
    public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) {
        TblColRef literalCol = functionDesc.getParameter().getColRefs().get(1);
        return Collections.singletonList(literalCol);
    }

    @Override
    public CapabilityResult.CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc topN) {
        if (digest.aggregations.size() != 1) {
            return null;
        }
        FunctionDesc onlyFunction = digest.aggregations.iterator().next();
        if (!this.isTopNCompatibleSum(topN.getFunction(), onlyFunction)) {
            return null;
        }
        TblColRef literalCol = this.getTopNLiteralColumn(topN.getFunction());
        if (!unmatchedDimensions.contains(literalCol)) {
            return null;
        }
        if (!digest.groupbyColumns.contains(literalCol)) {
            return null;
        }
        unmatchedDimensions.remove(literalCol);
        unmatchedAggregations.remove(onlyFunction);
        return new CapabilityResult.CapabilityInfluence(){

            @Override
            public double suggestCostMultiplier() {
                return 0.3;
            }
        };
    }

    private boolean isTopNCompatibleSum(FunctionDesc topN, FunctionDesc sum) {
        if (sum == null) {
            return false;
        }
        if (!this.isTopN(topN) || !sum.isSum()) {
            return false;
        }
        if (sum.getParameter().getColRefs().isEmpty()) {
            return false;
        }
        TblColRef sumCol = sum.getParameter().getColRefs().get(0);
        TblColRef topnNumCol = this.getTopNNumericColumn(topN);
        return sumCol.equals(topnNumCol);
    }

    @Override
    public boolean needRewrite() {
        return false;
    }

    @Override
    public Class<?> getRewriteCalciteAggrFunctionClass() {
        return null;
    }

    @Override
    public void adjustSqlDigest(MeasureDesc measureDesc, SQLDigest sqlDigest) {
        FunctionDesc topnFunc = measureDesc.getFunction();
        TblColRef topnLiteralCol = this.getTopNLiteralColumn(topnFunc);
        if (!sqlDigest.groupbyColumns.contains(topnLiteralCol)) {
            return;
        }
        if (sqlDigest.aggregations.size() > 1) {
            throw new IllegalStateException("When query with topN, only one metrics is allowed.");
        }
        if (sqlDigest.aggregations.size() > 0) {
            FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
            if (!origFunc.isSum()) {
                throw new IllegalStateException("When query with topN, only SUM function is allowed.");
            }
            logger.info("Rewrite function " + origFunc + " to " + topnFunc);
        }
        sqlDigest.aggregations = Lists.newArrayList((Object[])new FunctionDesc[]{topnFunc});
        sqlDigest.groupbyColumns.remove(topnLiteralCol);
        sqlDigest.metricColumns.add(topnLiteralCol);
    }

    @Override
    public boolean needAdvancedTupleFilling() {
        return true;
    }

    @Override
    public void fillTupleSimply(Tuple tuple, int indexInTuple, Object measureValue) {
        throw new UnsupportedOperationException();
    }

    @Override
    public MeasureType.IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo tupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) {
        TblColRef literalCol = this.getTopNLiteralColumn(function);
        TblColRef numericCol = this.getTopNNumericColumn(function);
        final Dictionary<String> topNColDict = dictionaryMap.get(literalCol);
        final int literalTupleIdx = tupleInfo.hasColumn(literalCol) ? tupleInfo.getColumnIndex(literalCol) : -1;
        final int numericTupleIdx = tupleInfo.hasColumn(numericCol) ? tupleInfo.getColumnIndex(numericCol) : -1;
        return new MeasureType.IAdvMeasureFiller(){
            private TopNCounter<ByteArray> topNCounter;
            private Iterator<Counter<ByteArray>> topNCounterIterator;
            private int expectRow = 0;

            @Override
            public void reload(Object measureValue) {
                this.topNCounter = (TopNCounter)measureValue;
                this.topNCounterIterator = this.topNCounter.iterator();
                this.expectRow = 0;
            }

            @Override
            public int getNumOfRows() {
                return this.topNCounter.size();
            }

            @Override
            public void fillTuple(Tuple tuple, int row) {
                if (this.expectRow++ != row) {
                    throw new IllegalStateException();
                }
                Counter<ByteArray> counter = this.topNCounterIterator.next();
                int key = BytesUtil.readUnsigned(counter.getItem().array(), counter.getItem().offset(), counter.getItem().length());
                String colValue = (String)topNColDict.getValueFromId(key);
                tuple.setDimensionValue(literalTupleIdx, colValue);
                tuple.setMeasureValue(numericTupleIdx, (Object)counter.getCount());
            }
        };
    }

    private TblColRef getTopNNumericColumn(FunctionDesc functionDesc) {
        return functionDesc.getParameter().getColRefs().get(0);
    }

    private TblColRef getTopNLiteralColumn(FunctionDesc functionDesc) {
        return functionDesc.getParameter().getColRefs().get(1);
    }

    private boolean isTopN(FunctionDesc functionDesc) {
        return FUNC_TOP_N.equalsIgnoreCase(functionDesc.getExpression());
    }

    public static class Factory
    extends MeasureTypeFactory<TopNCounter<ByteArray>> {
        @Override
        public MeasureType<TopNCounter<ByteArray>> createMeasureType(String funcName, DataType dataType) {
            return new TopNMeasureType(funcName, dataType);
        }

        @Override
        public String getAggrFunctionName() {
            return TopNMeasureType.FUNC_TOP_N;
        }

        @Override
        public String getAggrDataTypeName() {
            return TopNMeasureType.DATATYPE_TOPN;
        }

        @Override
        public Class<? extends DataTypeSerializer<TopNCounter<ByteArray>>> getAggrDataTypeSerializer() {
            return TopNCounterSerializer.class;
        }
    }
}

