/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.cube.v2;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import javax.annotation.Nullable;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CubeHBaseEndpointRPC
extends CubeHBaseRPC {
    public static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
    private static ExecutorService executorService = new LoggableCachedThreadPool();

    public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
        super(cubeSeg, cuboid, fullGTInfo);
    }

    private byte[] getByteArrayForShort(short v) {
        byte[] split = new byte[2];
        BytesUtil.writeUnsigned(v, split, 0, 2);
        return split;
    }

    private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) {
        if (shardNum == 0) {
            return Lists.newArrayList();
        }
        if (shardNum == totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair(this.getByteArrayForShort((short)0), this.getByteArrayForShort((short)(shardNum - 1)))});
        }
        if (baseShard + shardNum <= totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair(this.getByteArrayForShort(baseShard), this.getByteArrayForShort((short)(baseShard + shardNum - 1)))});
        }
        return Lists.newArrayList((Object[])new Pair[]{Pair.newPair(this.getByteArrayForShort(baseShard), this.getByteArrayForShort((short)(totalShards - 1))), Pair.newPair(this.getByteArrayForShort((short)0), this.getByteArrayForShort((short)(baseShard + shardNum - totalShards - 1)))});
    }

    protected Pair<Short, Short> getShardNumAndBaseShard() {
        return Pair.newPair(this.cubeSeg.getCuboidShardNum(this.cuboid.getId()), this.cubeSeg.getCuboidBaseShard(this.cuboid.getId()));
    }

    @Override
    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
        String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
        logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", (Object)this.cubeSeg, (Object)toggle);
        Pair<Short, Short> shardNumAndBaseShard = this.getShardNumAndBaseShard();
        short shardNum = shardNumAndBaseShard.getFirst();
        short cuboidBaseShard = shardNumAndBaseShard.getSecond();
        int totalShards = this.cubeSeg.getTotalShards();
        ByteString scanRequestByteString = null;
        ByteString rawScanByteString = null;
        ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
        final HConnection conn = HBaseConnection.get(this.cubeSeg.getCubeInstance().getConfig().getStorageUrl());
        ArrayList hbaseColumnsToGTIntList = Lists.newArrayList();
        List<List<Integer>> hbaseColumnsToGT = this.getHBaseColumnsGTMapping(selectedColBlocks);
        for (List<Integer> list : hbaseColumnsToGT) {
            hbaseColumnsToGTIntList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(list).build());
        }
        List<RawScan> rawScans = this.preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
        int rawScanBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
                BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
                for (RawScan rs : rawScans) {
                    RawScan.serializer.serialize(rs, rawScanBuffer);
                }
                rawScanBuffer.flip();
                rawScanByteString = HBaseZeroCopyByteString.wrap((byte[])rawScanBuffer.array(), (int)rawScanBuffer.position(), (int)rawScanBuffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", (Object)rawScanBufferSize);
                rawScanBufferSize *= 4;
                continue;
            }
            break;
        }
        scanRequest.setGTScanRanges(Lists.newArrayList());
        int scanRequestBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
                GTScanRequest.serializer.serialize(scanRequest, buffer);
                buffer.flip();
                scanRequestByteString = HBaseZeroCopyByteString.wrap((byte[])buffer.array(), (int)buffer.position(), (int)buffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", (Object)scanRequestBufferSize);
                scanRequestBufferSize *= 4;
                continue;
            }
            break;
        }
        logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", (Object)scanRequestByteString.size(), (Object)rawScanByteString.size());
        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", new Object[]{Integer.toHexString(System.identityHashCode(scanRequest)), this.cubeSeg, rawScans.size()});
        for (RawScan rs : rawScans) {
            this.logScan(rs, this.cubeSeg.getStorageLocationIdentifier());
        }
        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[]{shardNum, cuboidBaseShard, rawScans.size()});
        final AtomicInteger totalScannedCount = new AtomicInteger(0);
        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
        final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
        builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
        for (CubeVisitProtos.CubeVisitRequest.IntList intList : hbaseColumnsToGTIntList) {
            builder.addHbaseColumnsToGT(intList);
        }
        builder.setRowkeyPreambleSize(this.cubeSeg.getRowKeyPreambleSize());
        builder.setBehavior(toggle);
        builder.setStartTime(System.currentTimeMillis());
        builder.setTimeout(epResultItr.getTimeout());
        for (final Pair pair : this.getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    Map results;
                    String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
                    try {
                        results = CubeHBaseEndpointRPC.this.getResults(builder.build(), conn.getTable(CubeHBaseEndpointRPC.this.cubeSeg.getStorageLocationIdentifier()), (byte[])pair.getFirst(), (byte[])pair.getSecond());
                    }
                    catch (Throwable throwable) {
                        throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
                    }
                    boolean abnormalFinish = false;
                    for (Map.Entry result : results.entrySet()) {
                        totalScannedCount.addAndGet(((CubeVisitProtos.CubeVisitResponse)result.getValue()).getStats().getScannedRowCount());
                        logger.info(logHeader + CubeHBaseEndpointRPC.this.getStatsString(result));
                        if (((CubeVisitProtos.CubeVisitResponse)result.getValue()).getStats().getNormalComplete() != 1) {
                            abnormalFinish = true;
                            continue;
                        }
                        try {
                            epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)((CubeVisitProtos.CubeVisitResponse)result.getValue()).getCompressedRows())));
                        }
                        catch (IOException | DataFormatException e) {
                            throw new RuntimeException(logHeader + "Error when decompressing", e);
                        }
                    }
                    if (abnormalFinish) {
                        throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
                    }
                }
            });
        }
        return new EndpointResultsAsGTScanner(this.fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
    }

    private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
        StringBuilder sb = new StringBuilder();
        CubeVisitProtos.CubeVisitResponse.Stats stats = result.getValue().getStats();
        sb.append("Endpoint RPC returned from HTable ").append(this.cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(result.getKey())).append(" on host: ").append(stats.getHostname()).append(".");
        sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
        sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
        sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
        return sb.toString();
    }

    private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
        Map results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, (Batch.Call)new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>(){

            public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
                ServerRpcController controller = new ServerRpcController();
                BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
                rowsService.visitCube((RpcController)controller, request, (RpcCallback<CubeVisitProtos.CubeVisitResponse>)rpcCallback);
                CubeVisitProtos.CubeVisitResponse response = (CubeVisitProtos.CubeVisitResponse)rpcCallback.get();
                if (controller.failedOnException()) {
                    throw controller.getFailedOn();
                }
                return response;
            }
        });
        return results;
    }

    static class EndpointResultsAsGTScanner
    implements IGTScanner {
        private GTInfo info;
        private Iterator<byte[]> blocks;
        private ImmutableBitSet columns;
        private int totalScannedCount;

        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, int totalScannedCount) {
            this.info = info;
            this.blocks = blocks;
            this.columns = columns;
            this.totalScannedCount = totalScannedCount;
        }

        @Override
        public GTInfo getInfo() {
            return this.info;
        }

        @Override
        public int getScannedRowCount() {
            return this.totalScannedCount;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Iterator<GTRecord> iterator() {
            return Iterators.concat((Iterator)Iterators.transform(this.blocks, (Function)new Function<byte[], Iterator<GTRecord>>(){

                @Nullable
                public Iterator<GTRecord> apply(final @Nullable byte[] input) {
                    return new Iterator<GTRecord>(){
                        private ByteBuffer inputBuffer = null;
                        private GTRecord oneRecord = null;

                        @Override
                        public boolean hasNext() {
                            if (this.inputBuffer == null) {
                                this.inputBuffer = ByteBuffer.wrap(input);
                                this.oneRecord = new GTRecord(EndpointResultsAsGTScanner.this.info);
                            }
                            return this.inputBuffer.position() < this.inputBuffer.limit();
                        }

                        @Override
                        public GTRecord next() {
                            this.oneRecord.loadColumns(EndpointResultsAsGTScanner.this.columns, this.inputBuffer);
                            return this.oneRecord;
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }
            }));
        }
    }

    static class ExpectedSizeIterator
    implements Iterator<byte[]> {
        BlockingQueue<byte[]> queue;
        int expectedSize;
        int current = 0;
        long timeout;
        long timeoutTS;

        public ExpectedSizeIterator(int expectedSize) {
            this.expectedSize = expectedSize;
            this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
            this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
            this.timeout *= (long)KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
            if (BackdoorToggles.getQueryTimeout() != -1) {
                this.timeout = BackdoorToggles.getQueryTimeout();
            }
            this.timeout = (long)((double)this.timeout * 1.1);
            logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
            this.timeoutTS = System.currentTimeMillis() + this.timeout;
        }

        @Override
        public boolean hasNext() {
            return this.current < this.expectedSize;
        }

        @Override
        public byte[] next() {
            if (this.current >= this.expectedSize) {
                throw new IllegalStateException("Won't have more data");
            }
            try {
                ++this.current;
                long tsRemaining = this.timeoutTS - System.currentTimeMillis();
                if (tsRemaining < 0L) {
                    throw new RuntimeException("Timeout visiting cube!");
                }
                byte[] ret = this.queue.poll(tsRemaining, TimeUnit.MILLISECONDS);
                if (ret == null) {
                    throw new RuntimeException("Timeout visiting cube!");
                }
                return ret;
            }
            catch (InterruptedException e) {
                throw new RuntimeException("error when waiting queue", e);
            }
        }

        @Override
        public void remove() {
            throw new NotImplementedException();
        }

        public void append(byte[] data) {
            try {
                this.queue.put(data);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("error when waiting queue", e);
            }
        }

        public long getTimeout() {
            return this.timeout;
        }
    }
}

