/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;

import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.dict.TrieDictionary;
import org.apache.kylin.dimension.Dictionary;
import org.apache.kylin.invertedindex.index.RawTableRecord;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.BitMapFilterEvaluator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregationCache;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.HbaseServerKVIterator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.LocalDictionary;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.SliceBitMapProvider;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IIEndpoint
extends IIProtos.RowsService
implements Coprocessor,
CoprocessorService {
    private static final Logger logger = LoggerFactory.getLogger(IIEndpoint.class);
    private static final int MEMORY_LIMIT = 524288000;
    private RegionCoprocessorEnvironment env;
    private long serviceStartTime;
    private int shard;

    private Scan prepareScan(IIProtos.IIRequest request, HRegion region) throws IOException {
        Scan scan = new Scan();
        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
        if (request.hasTsRange()) {
            Range tsRange = (Range)SerializationUtils.deserialize((byte[])HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getTsRange()));
            byte[] regionStartKey = region.getStartKey();
            this.shard = !ArrayUtils.isEmpty((byte[])regionStartKey) ? BytesUtil.readUnsigned(regionStartKey, 0, 2) : 0;
            logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey) + ", making shard to be :" + this.shard);
            if (tsRange.hasLowerBound()) {
                Preconditions.checkArgument((this.shard != -1 ? 1 : 0) != 0, (Object)"Shard is -1!");
                long tsStart = (Long)tsRange.lowerEndpoint();
                logger.info("ts start is " + tsStart);
                byte[] idealStartKey = new byte[10];
                BytesUtil.writeUnsigned(this.shard, idealStartKey, 0, 2);
                BytesUtil.writeLong(tsStart, idealStartKey, 2, 8);
                logger.info("ideaStartKey is(readable) :" + BytesUtil.toReadableText(idealStartKey));
                Result result = region.getClosestRowBefore(idealStartKey, IIDesc.HBASE_FAMILY_BYTES);
                if (result != null) {
                    byte[] actualStartKey = Arrays.copyOf(result.getRow(), 10);
                    scan.setStartRow(actualStartKey);
                    logger.info("The start key is set to " + BytesUtil.toReadableText(actualStartKey));
                } else {
                    logger.info("There is no key before ideaStartKey so ignore tsStart");
                }
            }
            if (tsRange.hasUpperBound()) {
                Preconditions.checkArgument((this.shard != -1 ? 1 : 0) != 0, (Object)"Shard is -1");
                long tsEnd = (Long)tsRange.upperEndpoint();
                logger.info("ts end is " + tsEnd);
                byte[] actualEndKey = new byte[10];
                BytesUtil.writeUnsigned(this.shard, actualEndKey, 0, 2);
                BytesUtil.writeLong(tsEnd + 1L, actualEndKey, 2, 8);
                scan.setStopRow(actualEndKey);
                logger.info("The stop key is set to " + BytesUtil.toReadableText(actualEndKey));
            }
        }
        return scan;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse> done) {
        this.serviceStartTime = System.currentTimeMillis();
        RegionScanner innerScanner = null;
        HRegion region = null;
        try {
            region = this.env.getRegion();
            region.startRegionOperation();
            innerScanner = region.getScanner(this.prepareScan(request, region));
            CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getType()));
            CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getProjector()));
            EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getAggregator()));
            CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getFilter()));
            IIProtos.IIResponseInternal response = this.getResponse(innerScanner, type, projector, aggregators, filter);
            byte[] compressed = CompressionUtils.compress(response.toByteArray());
            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap((byte[])compressed)).build();
            done.run((Object)compressedR);
            IOUtils.closeQuietly((Closeable)innerScanner);
            if (region == null) return;
        }
        catch (IOException ioe) {
            logger.error(ioe.toString());
            ResponseConverter.setControllerException((RpcController)controller, (IOException)ioe);
            return;
        }
        try {
            region.closeRegionOperation();
            return;
        }
        catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        finally {
            IOUtils.closeQuietly(innerScanner);
            if (region != null) {
                try {
                    region.closeRegionOperation();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IIProtos.IIResponseInternal getResponse(RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators, CoprocessorFilter filter) {
        IIProtos.IIResponseInternal response;
        TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
        RegionScanner regionScanner = innerScanner;
        synchronized (regionScanner) {
            IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfoDigest);
            Iterable<Slice> slices = codec.decodeKeyValue(new HbaseServerKVIterator(innerScanner));
            response = this.getResponseInternal(slices, tableRecordInfoDigest, filter, type, projector, aggregators);
        }
        return response;
    }

    private IIProtos.IIResponseInternal getResponseInternal(Iterable<Slice> slices, TableRecordInfoDigest recordInfo, CoprocessorFilter filter, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators) {
        boolean needAgg = projector.hasGroupby() || !aggregators.isEmpty();
        EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
        int byteFormLen = recordInfo.getByteFormLen();
        int totalByteFormLen = 0;
        IIProtos.IIResponseInternal.Builder responseBuilder = IIProtos.IIResponseInternal.newBuilder();
        ClearTextDictionary clearTextDictionary = new ClearTextDictionary(recordInfo, type);
        RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary);
        byte[] recordBuffer = new byte[recordInfo.getByteFormLen()];
        byte[] buffer = new byte[65536];
        int iteratedSliceCount = 0;
        long latestSliceTs = Long.MIN_VALUE;
        for (Slice slice : slices) {
            CoprocessorFilter newFilter;
            latestSliceTs = slice.getTimestamp();
            ++iteratedSliceCount;
            Object[] localDictionaries = slice.getLocalDictionaries();
            boolean emptyDictionary = Array.isEmpty(localDictionaries);
            if (emptyDictionary) {
                newFilter = filter;
            } else {
                for (Object localDictionary : localDictionaries) {
                    if (!(localDictionary instanceof TrieDictionary)) continue;
                    ((TrieDictionary)localDictionary).enableIdToValueBytesCache();
                }
                newFilter = CoprocessorFilter.fromFilter(new LocalDictionary((Dictionary<?>[])localDictionaries, type, slice.getInfo()), filter.getFilter(), FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_LOCAL_DICT);
            }
            ConciseSet result = null;
            if (filter != null) {
                result = new BitMapFilterEvaluator(new SliceBitMapProvider(slice, type)).evaluate(newFilter.getFilter());
            }
            Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result);
            TblColRef[] columns = type.columns;
            int[] finalColumnLength = new int[columns.length];
            for (int i = 0; i < columns.length; ++i) {
                finalColumnLength[i] = rowKeyColumnIO.getColumnLength(columns[i]);
            }
            while (iterator.hasNext()) {
                RawTableRecord rawTableRecord = iterator.next();
                this.decodeWithDictionary(recordBuffer, rawTableRecord, (Dictionary<?>[])localDictionaries, recordInfo, rowKeyColumnIO, finalColumnLength);
                if (needAgg) {
                    AggrKey aggKey = projector.getAggrKey(recordBuffer);
                    MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                    aggregators.aggregate(bufs, recordBuffer);
                    aggCache.checkMemoryUsage();
                    continue;
                }
                if (totalByteFormLen >= 524288000) {
                    throw new RuntimeException("the query has exceeded the memory limit, please check the query");
                }
                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap((byte[])recordBuffer));
                responseBuilder.addRows(rowBuilder.build());
                totalByteFormLen += byteFormLen;
            }
        }
        logger.info("Iterated Slices count: " + iteratedSliceCount);
        if (needAgg) {
            int offset = 0;
            int measureLength = aggregators.getMeasureSerializeLength();
            for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
                AggrKey aggrKey = entry.getKey();
                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap((byte[])aggrKey.get(), (int)aggrKey.offset(), (int)aggrKey.length()));
                if (offset + measureLength > buffer.length) {
                    buffer = new byte[65536];
                    offset = 0;
                }
                int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);
                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap((byte[])buffer, (int)offset, (int)length));
                offset += length;
                responseBuilder.addRows(rowBuilder.build());
            }
        }
        responseBuilder.setStats(IIProtos.IIResponseInternal.Stats.newBuilder().setLatestDataTime(latestSliceTs).setServiceStartTime(this.serviceStartTime).setServiceEndTime(System.currentTimeMillis()).setScannedSlices(iteratedSliceCount));
        return responseBuilder.build();
    }

    private void decodeWithDictionary(byte[] recordBuffer, RawTableRecord encodedRecord, Dictionary<?>[] localDictionaries, TableRecordInfoDigest digest, RowKeyColumnIO rowKeyColumnIO, int[] finalColumnLengths) {
        boolean[] isMetric = digest.isMetrics();
        boolean emptyDictionary = Array.isEmpty(localDictionaries);
        for (int i = 0; i < finalColumnLengths.length; ++i) {
            if (isMetric[i]) {
                this.writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
                continue;
            }
            if (emptyDictionary) {
                this.writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
                continue;
            }
            Dictionary<?> localDictionary = localDictionaries[i];
            byte[] valueBytesFromId = localDictionary.getValueBytesFromId(encodedRecord.getValueID(i));
            this.writeColumnWithoutDictionary(valueBytesFromId, 0, valueBytesFromId.length, recordBuffer, digest.offset(i), finalColumnLengths[i]);
        }
    }

    private void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) {
        if (srcLength >= dstLength) {
            System.arraycopy(src, srcOffset, dst, dstOffset, dstLength);
        } else {
            System.arraycopy(src, srcOffset, dst, dstOffset, srcLength);
            Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, (byte)9);
        }
    }

    public void start(CoprocessorEnvironment env) throws IOException {
        if (!(env instanceof RegionCoprocessorEnvironment)) {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
        this.env = (RegionCoprocessorEnvironment)env;
    }

    public void stop(CoprocessorEnvironment env) throws IOException {
    }

    public Service getService() {
        return this;
    }
}

