/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.cube.v2;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.DataFormatException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.ExpectedSizeIterator;
import org.apache.kylin.storage.hbase.cube.v2.GTBlobScatter;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CubeHBaseEndpointRPC
extends CubeHBaseRPC {
    private static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
    private static ExecutorService executorService = new LoggableCachedThreadPool();

    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
        super(segment, cuboid, fullGTInfo);
    }

    private byte[] getByteArrayForShort(short v) {
        byte[] split = new byte[2];
        BytesUtil.writeUnsigned((int)v, (byte[])split, (int)0, (int)2);
        return split;
    }

    private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) {
        if (shardNum == 0) {
            return Lists.newArrayList();
        }
        if (shardNum == totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort((short)0), (Object)this.getByteArrayForShort((short)(shardNum - 1)))});
        }
        if (baseShard + shardNum <= totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort(baseShard), (Object)this.getByteArrayForShort((short)(baseShard + shardNum - 1)))});
        }
        return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort(baseShard), (Object)this.getByteArrayForShort((short)(totalShards - 1))), Pair.newPair((Object)this.getByteArrayForShort((short)0), (Object)this.getByteArrayForShort((short)(baseShard + shardNum - totalShards - 1)))});
    }

    protected Pair<Short, Short> getShardNumAndBaseShard() {
        return Pair.newPair((Object)this.cubeSeg.getCuboidShardNum(Long.valueOf(this.cuboid.getId())), (Object)this.cubeSeg.getCuboidBaseShard(Long.valueOf(this.cuboid.getId())));
    }

    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
        Pair<Short, Short> shardNumAndBaseShard = this.getShardNumAndBaseShard();
        short shardNum = (Short)shardNumAndBaseShard.getFirst();
        short cuboidBaseShard = (Short)shardNumAndBaseShard.getSecond();
        int totalShards = this.cubeSeg.getTotalShards(this.cuboid.getId());
        ByteString scanRequestByteString = null;
        ByteString rawScanByteString = null;
        ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
        final HConnection conn = HBaseConnection.get(this.cubeSeg.getCubeInstance().getConfig().getStorageUrl());
        ArrayList hbaseColumnsToGTIntList = Lists.newArrayList();
        List<List<Integer>> hbaseColumnsToGT = this.getHBaseColumnsGTMapping(selectedColBlocks);
        for (List<Integer> list : hbaseColumnsToGT) {
            hbaseColumnsToGTIntList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(list).build());
        }
        List<RawScan> rawScans = this.preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
        rawScanByteString = this.serializeRawScans(rawScans);
        scanRequest.clearScanRanges();
        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
        scanRequest.setTimeout(epResultItr.getRpcTimeout());
        scanRequestByteString = this.serializeGTScanReq(scanRequest);
        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", (Object)scanRequestByteString.size(), (Object)rawScanByteString.size());
        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", new Object[]{Integer.toHexString(System.identityHashCode(scanRequest)), this.cubeSeg, rawScans.size()});
        for (RawScan rs : rawScans) {
            this.logScan(rs, this.cubeSeg.getStorageLocationIdentifier());
        }
        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[]{shardNum, cuboidBaseShard, rawScans.size()});
        final AtomicLong totalScannedCount = new AtomicLong(0L);
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        final boolean compressionResult = kylinConfig.getCompressionResult();
        final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
        builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
        for (CubeVisitProtos.CubeVisitRequest.IntList intList : hbaseColumnsToGTIntList) {
            builder.addHbaseColumnsToGT(intList);
        }
        builder.setRowkeyPreambleSize(this.cubeSeg.getRowKeyPreambleSize());
        builder.setKylinProperties(kylinConfig.getConfigAsString());
        for (final Pair pair : this.getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    final String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
                    final boolean[] abnormalFinish = new boolean[1];
                    try {
                        HTableInterface table = conn.getTable(CubeHBaseEndpointRPC.this.cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
                        final CubeVisitProtos.CubeVisitRequest request = builder.build();
                        byte[] startKey = (byte[])pair.getFirst();
                        byte[] endKey = (byte[])pair.getSecond();
                        table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, (Batch.Call)new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>(){

                            public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
                                ServerRpcController controller = new ServerRpcController();
                                BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
                                rowsService.visitCube((RpcController)controller, request, (RpcCallback<CubeVisitProtos.CubeVisitResponse>)rpcCallback);
                                CubeVisitProtos.CubeVisitResponse response = (CubeVisitProtos.CubeVisitResponse)rpcCallback.get();
                                if (controller.failedOnException()) {
                                    throw controller.getFailedOn();
                                }
                                return response;
                            }
                        }, (Batch.Callback)new Batch.Callback<CubeVisitProtos.CubeVisitResponse>(){

                            public void update(byte[] region, byte[] row, CubeVisitProtos.CubeVisitResponse result) {
                                if (region == null) {
                                    return;
                                }
                                totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
                                logger.info(logHeader + CubeHBaseEndpointRPC.this.getStatsString(region, result));
                                if (result.getStats().getNormalComplete() != 1) {
                                    abnormalFinish[0] = true;
                                    return;
                                }
                                try {
                                    if (compressionResult) {
                                        epResultItr.append(CompressionUtils.decompress((byte[])HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)result.getCompressedRows())));
                                    } else {
                                        epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)result.getCompressedRows()));
                                    }
                                }
                                catch (IOException | DataFormatException e) {
                                    throw new RuntimeException(logHeader + "Error when decompressing", e);
                                }
                            }
                        });
                    }
                    catch (Throwable ex) {
                        logger.error(logHeader + "Error when visiting cubes by endpoint", ex);
                        epResultItr.notifyCoprocException(ex);
                        return;
                    }
                    if (abnormalFinish[0]) {
                        GTScanSelfTerminatedException ex = new GTScanSelfTerminatedException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
                        logger.error(logHeader + "Error when visiting cubes by endpoint", (Throwable)ex);
                        epResultItr.notifyCoprocException((Throwable)ex);
                        return;
                    }
                }
            });
        }
        return new GTBlobScatter(this.fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
    }

    private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
        ByteString scanRequestByteString;
        int scanRequestBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
                GTScanRequest.serializer.serialize((Object)scanRequest, buffer);
                buffer.flip();
                scanRequestByteString = HBaseZeroCopyByteString.wrap((byte[])buffer.array(), (int)buffer.position(), (int)buffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", (Object)scanRequestBufferSize);
                scanRequestBufferSize *= 4;
                continue;
            }
            break;
        }
        return scanRequestByteString;
    }

    private ByteString serializeRawScans(List<RawScan> rawScans) {
        ByteString rawScanByteString;
        int rawScanBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
                BytesUtil.writeVInt((int)rawScans.size(), (ByteBuffer)rawScanBuffer);
                for (RawScan rs : rawScans) {
                    RawScan.serializer.serialize((Object)rs, rawScanBuffer);
                }
                rawScanBuffer.flip();
                rawScanByteString = HBaseZeroCopyByteString.wrap((byte[])rawScanBuffer.array(), (int)rawScanBuffer.position(), (int)rawScanBuffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", (Object)rawScanBufferSize);
                rawScanBufferSize *= 4;
                continue;
            }
            break;
        }
        return rawScanByteString;
    }

    private String getStatsString(byte[] region, CubeVisitProtos.CubeVisitResponse result) {
        StringBuilder sb = new StringBuilder();
        CubeVisitProtos.CubeVisitResponse.Stats stats = result.getStats();
        sb.append("Endpoint RPC returned from HTable ").append(this.cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex((byte[])region)).append(" on host: ").append(stats.getHostname()).append(".");
        sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
        sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
        sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
        return sb.toString();
    }
}

