package org.apache.kylin.storage.hbase.cube.v2;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.zip.DataFormatException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.tools.ant.taskdefs.SQLExec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-storage-hbase-2.5.0.jar:org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.class */
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CubeHBaseEndpointRPC.class);
    private static ExecutorService executorService = new LoggableCachedThreadPool();
    static Field channelRowField;

    public CubeHBaseEndpointRPC(ISegment iSegment, Cuboid cuboid, GTInfo gTInfo, StorageContext storageContext) {
        super(iSegment, cuboid, gTInfo, storageContext);
    }

    private byte[] getByteArrayForShort(short s) {
        byte[] bArr = new byte[2];
        BytesUtil.writeUnsigned(s, bArr, 0, 2);
        return bArr;
    }

    private List<Pair<byte[], byte[]>> getEPKeyRanges(short s, short s2, int i) {
        return s2 == 0 ? Lists.newArrayList() : s2 == i ? Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (s2 - 1)))) : s + s2 <= i ? Lists.newArrayList(Pair.newPair(getByteArrayForShort(s), getByteArrayForShort((short) ((s + s2) - 1)))) : Lists.newArrayList(Pair.newPair(getByteArrayForShort(s), getByteArrayForShort((short) (i - 1))), Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (((s + s2) - i) - 1))));
    }

    protected Pair<Short, Short> getShardNumAndBaseShard() {
        return Pair.newPair(this.cubeSeg.getCuboidShardNum(Long.valueOf(this.cuboid.getId())), Short.valueOf(this.cubeSeg.getCuboidBaseShard(Long.valueOf(this.cuboid.getId()))));
    }

    @Override // org.apache.kylin.gridtable.IGTStorage
    public IGTScanner getGTScanner(GTScanRequest gTScanRequest) throws IOException {
        Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
        short shortValue = shardNumAndBaseShard.getFirst().shortValue();
        short shortValue2 = shardNumAndBaseShard.getSecond().shortValue();
        int totalShards = this.cubeSeg.getTotalShards(this.cuboid.getId());
        ImmutableBitSet immutableBitSet = gTScanRequest.getSelectedColBlocks().set(0);
        final Connection connection = HBaseConnection.get(this.cubeSeg.getCubeInstance().getConfig().getStorageUrl());
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<List<Integer>> it = getHBaseColumnsGTMapping(immutableBitSet).iterator();
        while (it.hasNext()) {
            newArrayList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(it.next()).m3870build());
        }
        List<RawScan> preparedHBaseScans = preparedHBaseScans(gTScanRequest.getGTScanRanges(), immutableBitSet);
        ByteString serializeRawScans = serializeRawScans(preparedHBaseScans);
        long coprocessorTimeoutMillis = getCoprocessorTimeoutMillis();
        gTScanRequest.setTimeout(coprocessorTimeoutMillis);
        gTScanRequest.clearScanRanges();
        ByteString serializeGTScanReq = serializeGTScanReq(gTScanRequest);
        final ExpectedSizeIterator expectedSizeIterator = new ExpectedSizeIterator(this.queryContext, shortValue, coprocessorTimeoutMillis);
        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", Integer.valueOf(serializeGTScanReq.size()), Integer.valueOf(serializeRawScans.size()));
        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(gTScanRequest)), this.cubeSeg, Integer.valueOf(preparedHBaseScans.size()));
        Iterator<RawScan> it2 = preparedHBaseScans.iterator();
        while (it2.hasNext()) {
            logScan(it2.next(), this.cubeSeg.getStorageLocationIdentifier());
        }
        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", Short.valueOf(shortValue), Short.valueOf(shortValue2), Integer.valueOf(preparedHBaseScans.size()));
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        final boolean compressionResult = instanceFromEnv.getCompressionResult();
        final CubeVisitProtos.CubeVisitRequest.Builder newBuilder = CubeVisitProtos.CubeVisitRequest.newBuilder();
        newBuilder.setGtScanRequest(serializeGTScanReq).setHbaseRawScan(serializeRawScans);
        Iterator it3 = newArrayList.iterator();
        while (it3.hasNext()) {
            newBuilder.addHbaseColumnsToGT((CubeVisitProtos.CubeVisitRequest.IntList) it3.next());
        }
        newBuilder.setRowkeyPreambleSize(this.cubeSeg.getRowKeyPreambleSize());
        newBuilder.setKylinProperties(instanceFromEnv.exportAllToString());
        String queryId = this.queryContext.getQueryId();
        if (queryId != null) {
            newBuilder.setQueryId(queryId);
        }
        newBuilder.setSpillEnabled(this.cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
        newBuilder.setMaxScanBytes(this.cubeSeg.getConfig().getPartitionMaxScanBytes());
        newBuilder.setIsExactAggregate(this.storageContext.isExactAggregation());
        final String format = String.format("<sub-thread for Query %s GTScanRequest %s>", this.queryContext.getQueryId(), Integer.toHexString(System.identityHashCode(gTScanRequest)));
        for (final Pair<byte[], byte[]> pair : getEPKeyRanges(shortValue2, shortValue, totalShards)) {
            executorService.submit(new Runnable() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.1
                @Override // java.lang.Runnable
                public void run() {
                    CubeHBaseEndpointRPC.this.runEPRange(CubeHBaseEndpointRPC.this.queryContext, format, compressionResult, newBuilder.m3839build(), connection, (byte[]) pair.getFirst(), (byte[]) pair.getSecond(), expectedSizeIterator);
                }
            });
        }
        return new StorageResponseGTScatter(gTScanRequest, new DummyPartitionStreamer(expectedSizeIterator), this.storageContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runEPRange(final QueryContext queryContext, final String str, final boolean z, final CubeVisitProtos.CubeVisitRequest cubeVisitRequest, final Connection connection, byte[] bArr, byte[] bArr2, final ExpectedSizeIterator expectedSizeIterator) {
        final String queryId = queryContext.getQueryId();
        try {
            final Table table = connection.getTable(TableName.valueOf(this.cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
            table.coprocessorService(CubeVisitProtos.CubeVisitService.class, bArr, bArr2, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.2
                public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService cubeVisitService) throws IOException {
                    if (queryContext.isStopped()) {
                        CubeHBaseEndpointRPC.logger.warn("Query-{}: the query has been stopped, not send request to region server any more.", queryId);
                        return null;
                    }
                    HRegionLocation startRegionLocation = getStartRegionLocation(cubeVisitService);
                    CubeHBaseEndpointRPC.logger.info("Query-{}: send request to the init region server {} on table {} ", queryId, startRegionLocation == null ? "UNKNOWN" : startRegionLocation.getHostname(), table.getName());
                    queryContext.addQueryStopListener(new QueryContext.QueryStopListener() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.2.1
                        private Thread hConnThread = Thread.currentThread();

                        @Override // org.apache.kylin.common.QueryContext.QueryStopListener
                        public void stop(QueryContext queryContext2) {
                            try {
                                this.hConnThread.interrupt();
                            } catch (Exception e) {
                                CubeHBaseEndpointRPC.logger.warn("Exception happens during interrupt thread {} due to {}", this.hConnThread.getName(), e);
                            }
                        }
                    });
                    ServerRpcController serverRpcController = new ServerRpcController();
                    BlockingRpcCallback blockingRpcCallback = new BlockingRpcCallback();
                    try {
                        try {
                            cubeVisitService.visitCube(serverRpcController, cubeVisitRequest, blockingRpcCallback);
                            CubeVisitProtos.CubeVisitResponse cubeVisitResponse = (CubeVisitProtos.CubeVisitResponse) blockingRpcCallback.get();
                            if (serverRpcController.failedOnException()) {
                                throw serverRpcController.getFailedOn();
                            }
                            Thread.interrupted();
                            return cubeVisitResponse;
                        } catch (Exception e) {
                            throw e;
                        }
                    } catch (Throwable th) {
                        Thread.interrupted();
                        throw th;
                    }
                }

                private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService cubeVisitService) {
                    try {
                        return connection.getRegionLocator(table.getName()).getRegionLocation((byte[]) CubeHBaseEndpointRPC.channelRowField.get(((CubeVisitProtos.CubeVisitService.Stub) cubeVisitService).getChannel()), false);
                    } catch (Throwable th) {
                        CubeHBaseEndpointRPC.logger.warn("error when get region server name", th);
                        return null;
                    }
                }
            }, new Batch.Callback<CubeVisitProtos.CubeVisitResponse>() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.3
                public void update(byte[] bArr3, byte[] bArr4, CubeVisitProtos.CubeVisitResponse cubeVisitResponse) {
                    if (cubeVisitResponse == null || bArr3 == null || queryContext.isStopped()) {
                        return;
                    }
                    CubeHBaseEndpointRPC.logger.info(str + CubeHBaseEndpointRPC.this.getStatsString(bArr3, cubeVisitResponse));
                    CubeVisitProtos.CubeVisitResponse.Stats stats = cubeVisitResponse.getStats();
                    queryContext.addAndGetScannedRows(stats.getScannedRowCount());
                    queryContext.addAndGetScannedBytes(stats.getScannedBytes());
                    queryContext.addAndGetReturnedRows((stats.getScannedRowCount() - stats.getAggregatedRowCount()) - stats.getFilteredRowCount());
                    RuntimeException runtimeException = null;
                    if (cubeVisitResponse.getStats().getNormalComplete() != 1) {
                        runtimeException = CubeHBaseEndpointRPC.this.getCoprocessorException(cubeVisitResponse);
                    }
                    queryContext.addRPCStatistics(CubeHBaseEndpointRPC.this.storageContext.ctxId, stats.getHostname(), CubeHBaseEndpointRPC.this.cubeSeg.getCubeDesc().getName(), CubeHBaseEndpointRPC.this.cubeSeg.getName(), CubeHBaseEndpointRPC.this.cuboid.getInputID(), CubeHBaseEndpointRPC.this.cuboid.getId(), CubeHBaseEndpointRPC.this.storageContext.getFilterMask(), runtimeException, stats.getServiceEndTime() - stats.getServiceStartTime(), 0L, stats.getScannedRowCount(), (stats.getScannedRowCount() - stats.getAggregatedRowCount()) - stats.getFilteredRowCount(), stats.getAggregatedRowCount(), stats.getScannedBytes());
                    if (queryContext.getScannedBytes() > CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxScanBytes()) {
                        runtimeException = new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxScanBytes());
                    } else if (queryContext.getReturnedRows() > CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxReturnRows()) {
                        runtimeException = new ResourceLimitExceededException("Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold " + CubeHBaseEndpointRPC.this.cubeSeg.getConfig().getQueryMaxReturnRows());
                    }
                    if (runtimeException != null) {
                        queryContext.stop(runtimeException);
                        return;
                    }
                    try {
                        if (z) {
                            expectedSizeIterator.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(cubeVisitResponse.getCompressedRows())));
                        } else {
                            expectedSizeIterator.append(HBaseZeroCopyByteString.zeroCopyGetBytes(cubeVisitResponse.getCompressedRows()));
                        }
                    } catch (IOException | DataFormatException e) {
                        throw new RuntimeException(str + "Error when decompressing", e);
                    }
                }
            });
        } catch (Throwable th) {
            queryContext.stop(th);
        }
        if (queryContext.isStopped()) {
            logger.error(str + "Error when visiting cubes by endpoint", queryContext.getThrowable());
        }
    }

    private ByteString serializeGTScanReq(GTScanRequest gTScanRequest) {
        int i;
        int i2 = 65536;
        while (true) {
            try {
                i = i2;
                ByteBuffer allocate = ByteBuffer.allocate(i);
                GTScanRequest.serializer.serialize(gTScanRequest, allocate);
                allocate.flip();
                return HBaseZeroCopyByteString.wrap(allocate.array(), allocate.position(), allocate.limit());
            } catch (BufferOverflowException e) {
                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", Integer.valueOf(i));
                i2 = i * 4;
            }
        }
    }

    private ByteString serializeRawScans(List<RawScan> list) {
        int i;
        int i2 = 65536;
        while (true) {
            try {
                i = i2;
                ByteBuffer allocate = ByteBuffer.allocate(i);
                BytesUtil.writeVInt(list.size(), allocate);
                Iterator<RawScan> it = list.iterator();
                while (it.hasNext()) {
                    RawScan.serializer.serialize(it.next(), allocate);
                }
                allocate.flip();
                return HBaseZeroCopyByteString.wrap(allocate.array(), allocate.position(), allocate.limit());
            } catch (BufferOverflowException e) {
                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", Integer.valueOf(i));
                i2 = i * 4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getStatsString(byte[] bArr, CubeVisitProtos.CubeVisitResponse cubeVisitResponse) {
        StringBuilder sb = new StringBuilder();
        CubeVisitProtos.CubeVisitResponse.Stats stats = cubeVisitResponse.getStats();
        byte[] zeroCopyGetBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(cubeVisitResponse.getCompressedRows());
        sb.append("Endpoint RPC returned from HTable ").append(this.cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(bArr)).append(" on host: ").append(stats.getHostname()).append(".");
        sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
        sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
        sb.append("Total filtered row: ").append(stats.getFilteredRowCount()).append(". ");
        sb.append("Total aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
        sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
        sb.append("Compressed row size: ").append(zeroCopyGetBytes.length);
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RuntimeException getCoprocessorException(CubeVisitProtos.CubeVisitResponse cubeVisitResponse) {
        if (!cubeVisitResponse.hasErrorInfo()) {
            return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
        }
        CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = cubeVisitResponse.getErrorInfo();
        switch (errorInfo.getType()) {
            case UNKNOWN_TYPE:
                return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage());
            case TIMEOUT:
                return new KylinTimeoutException(errorInfo.getMessage());
            case RESOURCE_LIMIT_EXCEEDED:
                return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage());
            default:
                throw new AssertionError("Unknown error type: " + errorInfo.getType());
        }
    }

    static {
        channelRowField = null;
        try {
            channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField(SQLExec.DelimiterType.ROW);
            channelRowField.setAccessible(true);
        } catch (Throwable th) {
            logger.warn("error when get row field from RegionCoprocessorRpcChannel class", th);
        }
    }
}
