/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.cube.v2;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.zip.DataFormatException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.IPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.ExpectedSizeIterator;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CubeHBaseEndpointRPC
extends CubeHBaseRPC {
    private static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
    private static ExecutorService executorService = new LoggableCachedThreadPool();
    static Field channelRowField = null;

    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
        super(segment, cuboid, fullGTInfo, context);
    }

    private byte[] getByteArrayForShort(short v) {
        byte[] split = new byte[2];
        BytesUtil.writeUnsigned((int)v, (byte[])split, (int)0, (int)2);
        return split;
    }

    private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) {
        if (shardNum == 0) {
            return Lists.newArrayList();
        }
        if (shardNum == totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort((short)0), (Object)this.getByteArrayForShort((short)(shardNum - 1)))});
        }
        if (baseShard + shardNum <= totalShards) {
            return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort(baseShard), (Object)this.getByteArrayForShort((short)(baseShard + shardNum - 1)))});
        }
        return Lists.newArrayList((Object[])new Pair[]{Pair.newPair((Object)this.getByteArrayForShort(baseShard), (Object)this.getByteArrayForShort((short)(totalShards - 1))), Pair.newPair((Object)this.getByteArrayForShort((short)0), (Object)this.getByteArrayForShort((short)(baseShard + shardNum - totalShards - 1)))});
    }

    protected Pair<Short, Short> getShardNumAndBaseShard() {
        return Pair.newPair((Object)this.cubeSeg.getCuboidShardNum(Long.valueOf(this.cuboid.getId())), (Object)this.cubeSeg.getCuboidBaseShard(Long.valueOf(this.cuboid.getId())));
    }

    public IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException {
        Pair<Short, Short> shardNumAndBaseShard = this.getShardNumAndBaseShard();
        short shardNum = (Short)shardNumAndBaseShard.getFirst();
        short cuboidBaseShard = (Short)shardNumAndBaseShard.getSecond();
        int totalShards = this.cubeSeg.getTotalShards(this.cuboid.getId());
        ByteString scanRequestByteString = null;
        ByteString rawScanByteString = null;
        ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
        final Connection conn = HBaseConnection.get(this.cubeSeg.getCubeInstance().getConfig().getStorageUrl());
        ArrayList hbaseColumnsToGTIntList = Lists.newArrayList();
        List<List<Integer>> hbaseColumnsToGT = this.getHBaseColumnsGTMapping(selectedColBlocks);
        for (List<Integer> list : hbaseColumnsToGT) {
            hbaseColumnsToGTIntList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(list).build());
        }
        List<RawScan> rawScans = this.preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
        rawScanByteString = this.serializeRawScans(rawScans);
        long coprocessorTimeout = this.getCoprocessorTimeoutMillis();
        scanRequest.setTimeout(coprocessorTimeout);
        scanRequest.clearScanRanges();
        scanRequestByteString = this.serializeGTScanReq(scanRequest);
        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(this.queryContext, shardNum, coprocessorTimeout);
        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", (Object)scanRequestByteString.size(), (Object)rawScanByteString.size());
        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", new Object[]{Integer.toHexString(System.identityHashCode(scanRequest)), this.cubeSeg, rawScans.size()});
        for (RawScan rs : rawScans) {
            this.logScan(rs, this.cubeSeg.getStorageLocationIdentifier());
        }
        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[]{shardNum, cuboidBaseShard, rawScans.size()});
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        final boolean compressionResult = kylinConfig.getCompressionResult();
        final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
        builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
        for (CubeVisitProtos.CubeVisitRequest.IntList intList : hbaseColumnsToGTIntList) {
            builder.addHbaseColumnsToGT(intList);
        }
        builder.setRowkeyPreambleSize(this.cubeSeg.getRowKeyPreambleSize());
        builder.setKylinProperties(kylinConfig.exportAllToString());
        String queryId = this.queryContext.getQueryId();
        if (queryId != null) {
            builder.setQueryId(queryId);
        }
        builder.setSpillEnabled(this.cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
        builder.setMaxScanBytes(this.cubeSeg.getConfig().getPartitionMaxScanBytes());
        builder.setIsExactAggregate(this.storageContext.isExactAggregation());
        final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", this.queryContext.getQueryId(), Integer.toHexString(System.identityHashCode(scanRequest)));
        for (final Pair<byte[], byte[]> epRange : this.getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    CubeHBaseEndpointRPC.this.runEPRange(CubeHBaseEndpointRPC.this.queryContext, logHeader, compressionResult, builder.build(), conn, (byte[])epRange.getFirst(), (byte[])epRange.getSecond(), epResultItr);
                }
            });
        }
        return new StorageResponseGTScatter(scanRequest, (IPartitionStreamer)new DummyPartitionStreamer((Iterator)epResultItr), this.storageContext);
    }

    private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult, final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey, final ExpectedSizeIterator epResultItr) {
        final String queryId = queryContext.getQueryId();
        try {
            final Table table = conn.getTable(TableName.valueOf((String)this.cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
            table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, (Batch.Call)new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>(){

                public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
                    if (queryContext.isStopped()) {
                        logger.warn("Query-{}: the query has been stopped, not send request to region server any more.", (Object)queryId);
                        return null;
                    }
                    HRegionLocation regionLocation = this.getStartRegionLocation(rowsService);
                    String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname();
                    logger.info("Query-{}: send request to the init region server {} on table {} ", new Object[]{queryId, regionServerName, table.getName()});
                    queryContext.addQueryStopListener(new QueryContext.QueryStopListener(){
                        private Thread hConnThread = Thread.currentThread();

                        public void stop(QueryContext query) {
                            try {
                                this.hConnThread.interrupt();
                            }
                            catch (Exception e) {
                                logger.warn("Exception happens during interrupt thread {} due to {}", (Object)this.hConnThread.getName(), (Object)e);
                            }
                        }
                    });
                    ServerRpcController controller = new ServerRpcController();
                    BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
                    try {
                        rowsService.visitCube((RpcController)controller, request, (RpcCallback<CubeVisitProtos.CubeVisitResponse>)rpcCallback);
                        CubeVisitProtos.CubeVisitResponse response = (CubeVisitProtos.CubeVisitResponse)rpcCallback.get();
                        if (controller.failedOnException()) {
                            throw controller.getFailedOn();
                        }
                        CubeVisitProtos.CubeVisitResponse cubeVisitResponse = response;
                        return cubeVisitResponse;
                    }
                    catch (Exception e) {
                        throw e;
                    }
                    finally {
                        Thread.interrupted();
                    }
                }

                private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) {
                    try {
                        CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub)rowsService;
                        RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel)rowsServiceStub.getChannel();
                        byte[] row = (byte[])channelRowField.get(channel);
                        return conn.getRegionLocator(table.getName()).getRegionLocation(row, false);
                    }
                    catch (Throwable throwable) {
                        logger.warn("error when get region server name", throwable);
                        return null;
                    }
                }
            }, (Batch.Callback)new Batch.Callback<CubeVisitProtos.CubeVisitResponse>(){

                public void update(byte[] region, byte[] row, CubeVisitProtos.CubeVisitResponse result) {
                    if (result == null) {
                        return;
                    }
                    if (region == null) {
                        return;
                    }
                    if (queryContext.isStopped()) {
                        return;
                    }
                    logger.info(logHeader + CubeHBaseEndpointRPC.this.getStatsString(region, result));
                    CubeVisitProtos.CubeVisitResponse.Stats stats = result.getStats();
                    queryContext.addAndGetScannedRows(stats.getScannedRowCount());
                    queryContext.addAndGetScannedBytes(stats.getScannedBytes());
                    queryContext.addAndGetReturnedRows(stats.getScannedRowCount() - stats.getAggregatedRowCount() - stats.getFilteredRowCount());
                    Throwable rpcException = null;
                    if (result.getStats().getNormalComplete() != 1) {
                        rpcException = CubeHBaseEndpointRPC.this.getCoprocessorException(result);
                    }
                    queryContext.addRPCStatistics(CubeHBaseEndpointRPC.this.storageContext.ctxId, stats.getHostname(), CubeHBaseEndpointRPC.this.cubeSeg.getCubeDesc().getName(), CubeHBaseEndpointRPC.this.cubeSeg.getName(), CubeHBaseEndpointRPC.this.cuboid.getInputID(), CubeHBaseEndpointRPC.this.cuboid.getId(), CubeHBaseEndpointRPC.this.storageContext.getFilterMask(), (Exception)rpcException, stats.getServiceEndTime() - stats.getServiceStartTime(), 0L, stats.getScannedRowCount(), stats.getScannedRowCount() - stats.getAggregatedRowCount() - stats.getFilteredRowCount(), stats.getAggregatedRowCount(), stats.getScannedBytes());
                    if (queryContext.getScannedBytes() > CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxScanBytes()) {
                        rpcException = new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxScanBytes());
                    } else if (queryContext.getReturnedRows() > CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxReturnRows()) {
                        rpcException = new ResourceLimitExceededException("Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold " + CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxReturnRows());
                    }
                    if (rpcException != null) {
                        queryContext.stop(rpcException);
                        return;
                    }
                    try {
                        if (compressionResult) {
                            epResultItr.append(CompressionUtils.decompress((byte[])HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)result.getCompressedRows())));
                        } else {
                            epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)result.getCompressedRows()));
                        }
                    }
                    catch (IOException | DataFormatException e) {
                        throw new RuntimeException(logHeader + "Error when decompressing", e);
                    }
                }
            });
        }
        catch (Throwable ex) {
            queryContext.stop(ex);
        }
        if (queryContext.isStopped()) {
            logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable());
        }
    }

    private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
        ByteString scanRequestByteString;
        int scanRequestBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
                GTScanRequest.serializer.serialize((Object)scanRequest, buffer);
                buffer.flip();
                scanRequestByteString = HBaseZeroCopyByteString.wrap((byte[])buffer.array(), (int)buffer.position(), (int)buffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", (Object)scanRequestBufferSize);
                scanRequestBufferSize *= 4;
                continue;
            }
            break;
        }
        return scanRequestByteString;
    }

    private ByteString serializeRawScans(List<RawScan> rawScans) {
        ByteString rawScanByteString;
        int rawScanBufferSize = 65536;
        while (true) {
            try {
                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
                BytesUtil.writeVInt((int)rawScans.size(), (ByteBuffer)rawScanBuffer);
                for (RawScan rs : rawScans) {
                    RawScan.serializer.serialize((Object)rs, rawScanBuffer);
                }
                rawScanBuffer.flip();
                rawScanByteString = HBaseZeroCopyByteString.wrap((byte[])rawScanBuffer.array(), (int)rawScanBuffer.position(), (int)rawScanBuffer.limit());
            }
            catch (BufferOverflowException boe) {
                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", (Object)rawScanBufferSize);
                rawScanBufferSize *= 4;
                continue;
            }
            break;
        }
        return rawScanByteString;
    }

    private String getStatsString(byte[] region, CubeVisitProtos.CubeVisitResponse result) {
        StringBuilder sb = new StringBuilder();
        CubeVisitProtos.CubeVisitResponse.Stats stats = result.getStats();
        byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)result.getCompressedRows());
        sb.append("Endpoint RPC returned from HTable ").append(this.cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex((byte[])region)).append(" on host: ").append(stats.getHostname()).append(".");
        sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
        sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
        sb.append("Total filtered row: ").append(stats.getFilteredRowCount()).append(". ");
        sb.append("Total aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
        sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
        sb.append("Compressed row size: ").append(compressedRows.length);
        return sb.toString();
    }

    private RuntimeException getCoprocessorException(CubeVisitProtos.CubeVisitResponse response) {
        if (!response.hasErrorInfo()) {
            return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
        }
        CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo();
        switch (errorInfo.getType()) {
            case UNKNOWN_TYPE: {
                return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage());
            }
            case TIMEOUT: {
                return new KylinTimeoutException(errorInfo.getMessage());
            }
            case RESOURCE_LIMIT_EXCEEDED: {
                return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage());
            }
        }
        throw new AssertionError((Object)("Unknown error type: " + (Object)((Object)errorInfo.getType())));
    }

    static {
        try {
            channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row");
            channelRowField.setAccessible(true);
        }
        catch (Throwable t) {
            logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t);
        }
    }
}

