package org.apache.kylin.storage.hbase.cube.v2;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.DataFormatException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.class */
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CubeHBaseEndpointRPC.class);
    private static ExecutorService executorService = new LoggableCachedThreadPool();

    public CubeHBaseEndpointRPC(ISegment iSegment, Cuboid cuboid, GTInfo gTInfo) {
        super(iSegment, cuboid, gTInfo);
    }

    private byte[] getByteArrayForShort(short s) {
        byte[] bArr = new byte[2];
        BytesUtil.writeUnsigned(s, bArr, 0, 2);
        return bArr;
    }

    private List<Pair<byte[], byte[]>> getEPKeyRanges(short s, short s2, int i) {
        return s2 == 0 ? Lists.newArrayList() : s2 == i ? Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (s2 - 1)))) : s + s2 <= i ? Lists.newArrayList(Pair.newPair(getByteArrayForShort(s), getByteArrayForShort((short) ((s + s2) - 1)))) : Lists.newArrayList(Pair.newPair(getByteArrayForShort(s), getByteArrayForShort((short) (i - 1))), Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (((s + s2) - i) - 1))));
    }

    protected Pair<Short, Short> getShardNumAndBaseShard() {
        return Pair.newPair(this.cubeSeg.getCuboidShardNum(Long.valueOf(this.cuboid.getId())), Short.valueOf(this.cubeSeg.getCuboidBaseShard(Long.valueOf(this.cuboid.getId()))));
    }

    @Override // org.apache.kylin.gridtable.IGTStorage
    public IGTScanner getGTScanner(final GTScanRequest gTScanRequest) throws IOException {
        Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
        short shortValue = shardNumAndBaseShard.getFirst().shortValue();
        short shortValue2 = shardNumAndBaseShard.getSecond().shortValue();
        int totalShards = this.cubeSeg.getTotalShards(this.cuboid.getId());
        ImmutableBitSet immutableBitSet = gTScanRequest.getSelectedColBlocks().set(0);
        final HConnection hConnection = HBaseConnection.get(this.cubeSeg.getCubeInstance().getConfig().getStorageUrl());
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<List<Integer>> it2 = getHBaseColumnsGTMapping(immutableBitSet).iterator();
        while (it2.hasNext()) {
            newArrayList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(it2.next()).m1679build());
        }
        List<RawScan> preparedHBaseScans = preparedHBaseScans(gTScanRequest.getGTScanRanges(), immutableBitSet);
        ByteString serializeRawScans = serializeRawScans(preparedHBaseScans);
        int coprocessorTimeoutMillis = getCoprocessorTimeoutMillis();
        gTScanRequest.setTimeout(coprocessorTimeoutMillis);
        gTScanRequest.clearScanRanges();
        ByteString serializeGTScanReq = serializeGTScanReq(gTScanRequest);
        final ExpectedSizeIterator expectedSizeIterator = new ExpectedSizeIterator(shortValue, coprocessorTimeoutMillis);
        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", Integer.valueOf(serializeGTScanReq.size()), Integer.valueOf(serializeRawScans.size()));
        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(gTScanRequest)), this.cubeSeg, Integer.valueOf(preparedHBaseScans.size()));
        Iterator<RawScan> it3 = preparedHBaseScans.iterator();
        while (it3.hasNext()) {
            logScan(it3.next(), this.cubeSeg.getStorageLocationIdentifier());
        }
        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", Short.valueOf(shortValue), Short.valueOf(shortValue2), Integer.valueOf(preparedHBaseScans.size()));
        final AtomicLong atomicLong = new AtomicLong(0L);
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        final boolean compressionResult = instanceFromEnv.getCompressionResult();
        final CubeVisitProtos.CubeVisitRequest.Builder newBuilder = CubeVisitProtos.CubeVisitRequest.newBuilder();
        newBuilder.setGtScanRequest(serializeGTScanReq).setHbaseRawScan(serializeRawScans);
        Iterator it4 = newArrayList.iterator();
        while (it4.hasNext()) {
            newBuilder.addHbaseColumnsToGT((CubeVisitProtos.CubeVisitRequest.IntList) it4.next());
        }
        newBuilder.setRowkeyPreambleSize(this.cubeSeg.getRowKeyPreambleSize());
        newBuilder.setKylinProperties(instanceFromEnv.getConfigAsString());
        final String queryId = BackdoorToggles.getQueryId();
        if (queryId != null) {
            newBuilder.setQueryId(queryId);
        }
        for (final Pair<byte[], byte[]> pair : getEPKeyRanges(shortValue2, shortValue, totalShards)) {
            executorService.submit(new Runnable() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.1
                @Override // java.lang.Runnable
                public void run() {
                    final String format = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(gTScanRequest)));
                    final boolean[] zArr = new boolean[1];
                    try {
                        HTableInterface table = hConnection.getTable(CubeHBaseEndpointRPC.this.cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
                        final CubeVisitProtos.CubeVisitRequest m1648build = newBuilder.m1648build();
                        table.coprocessorService(CubeVisitProtos.CubeVisitService.class, (byte[]) pair.getFirst(), (byte[]) pair.getSecond(), new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.1.1
                            public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService cubeVisitService) throws IOException {
                                ServerRpcController serverRpcController = new ServerRpcController();
                                BlockingRpcCallback blockingRpcCallback = new BlockingRpcCallback();
                                cubeVisitService.visitCube(serverRpcController, m1648build, blockingRpcCallback);
                                CubeVisitProtos.CubeVisitResponse cubeVisitResponse = (CubeVisitProtos.CubeVisitResponse) blockingRpcCallback.get();
                                if (serverRpcController.failedOnException()) {
                                    throw serverRpcController.getFailedOn();
                                }
                                return cubeVisitResponse;
                            }
                        }, new Batch.Callback<CubeVisitProtos.CubeVisitResponse>() { // from class: org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC.1.2
                            public void update(byte[] bArr, byte[] bArr2, CubeVisitProtos.CubeVisitResponse cubeVisitResponse) {
                                if (bArr == null) {
                                    return;
                                }
                                atomicLong.addAndGet(cubeVisitResponse.getStats().getScannedRowCount());
                                CubeHBaseEndpointRPC.logger.info(format + CubeHBaseEndpointRPC.this.getStatsString(bArr, cubeVisitResponse));
                                if (cubeVisitResponse.getStats().getNormalComplete() != 1) {
                                    zArr[0] = true;
                                    return;
                                }
                                try {
                                    if (compressionResult) {
                                        expectedSizeIterator.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(cubeVisitResponse.getCompressedRows())));
                                    } else {
                                        expectedSizeIterator.append(HBaseZeroCopyByteString.zeroCopyGetBytes(cubeVisitResponse.getCompressedRows()));
                                    }
                                } catch (IOException | DataFormatException e) {
                                    throw new RuntimeException(format + "Error when decompressing", e);
                                }
                            }
                        });
                        if (zArr[0]) {
                            GTScanSelfTerminatedException gTScanSelfTerminatedException = new GTScanSelfTerminatedException(format + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
                            CubeHBaseEndpointRPC.logger.error(format + "Error when visiting cubes by endpoint", (Throwable) gTScanSelfTerminatedException);
                            expectedSizeIterator.notifyCoprocException(gTScanSelfTerminatedException);
                        }
                    } catch (Throwable th) {
                        CubeHBaseEndpointRPC.logger.error(format + "Error when visiting cubes by endpoint", th);
                        expectedSizeIterator.notifyCoprocException(th);
                    }
                }
            });
        }
        return new StorageResponseGTScatter(this.fullGTInfo, expectedSizeIterator, gTScanRequest.getColumns(), atomicLong.get(), gTScanRequest.getStoragePushDownLimit());
    }

    private ByteString serializeGTScanReq(GTScanRequest gTScanRequest) {
        int i;
        int i2 = BytesSerializer.SERIALIZE_BUFFER_SIZE;
        while (true) {
            try {
                i = i2;
                ByteBuffer allocate = ByteBuffer.allocate(i);
                GTScanRequest.serializer.serialize(gTScanRequest, allocate);
                allocate.flip();
                return HBaseZeroCopyByteString.wrap(allocate.array(), allocate.position(), allocate.limit());
            } catch (BufferOverflowException e) {
                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", Integer.valueOf(i));
                i2 = i * 4;
            }
        }
    }

    private ByteString serializeRawScans(List<RawScan> list) {
        int i;
        int i2 = BytesSerializer.SERIALIZE_BUFFER_SIZE;
        while (true) {
            try {
                i = i2;
                ByteBuffer allocate = ByteBuffer.allocate(i);
                BytesUtil.writeVInt(list.size(), allocate);
                Iterator<RawScan> it2 = list.iterator();
                while (it2.hasNext()) {
                    RawScan.serializer.serialize(it2.next(), allocate);
                }
                allocate.flip();
                return HBaseZeroCopyByteString.wrap(allocate.array(), allocate.position(), allocate.limit());
            } catch (BufferOverflowException e) {
                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", Integer.valueOf(i));
                i2 = i * 4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getStatsString(byte[] bArr, CubeVisitProtos.CubeVisitResponse cubeVisitResponse) {
        StringBuilder sb = new StringBuilder();
        CubeVisitProtos.CubeVisitResponse.Stats stats = cubeVisitResponse.getStats();
        sb.append("Endpoint RPC returned from HTable ").append(this.cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(bArr)).append(" on host: ").append(stats.getHostname()).append(".");
        sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
        sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
        sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
        return sb.toString();
    }
}
