/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.sun.management.OperatingSystemMXBean;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.StringReader;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanExceedThresholdException;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.cube.v2.CellListIterator;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CubeVisitService
extends CubeVisitProtos.CubeVisitService
implements Coprocessor,
CoprocessorService {
    private static final Logger logger = LoggerFactory.getLogger(CubeVisitService.class);
    private static final int MEMORY_LIMIT = 524288000;
    private RegionCoprocessorEnvironment env;
    private long serviceStartTime;

    private void updateRawScanByCurrentRegion(RawScan rawScan, HRegion region, int shardLength) {
        if (shardLength == 0) {
            return;
        }
        byte[] regionStartKey = ArrayUtils.isEmpty((byte[])region.getStartKey()) ? new byte[shardLength] : region.getStartKey();
        Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
        Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
    }

    private List<RawScan> deserializeRawScans(ByteBuffer in) {
        int rawScanCount = BytesUtil.readVInt(in);
        ArrayList ret = Lists.newArrayList();
        for (int i = 0; i < rawScanCount; ++i) {
            RawScan temp = RawScan.serializer.deserialize(in);
            ret.add(temp);
        }
        return ret;
    }

    private void appendProfileInfo(StringBuilder sb, String info) {
        if (info != null) {
            sb.append(info);
        }
        sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
        sb.append(",");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
        ArrayList regionScanners = Lists.newArrayList();
        HRegion region = null;
        StringBuilder sb = new StringBuilder();
        String debugGitTag = "";
        try {
            this.serviceStartTime = System.currentTimeMillis();
            region = this.env.getRegion();
            region.startRegionOperation();
            String serverPropString = request.getKylinProperties();
            Properties serverProp = new Properties();
            serverProp.load(new StringReader(serverPropString));
            KylinConfig.setKylinConfigInEnvIfMissing(serverProp);
            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
            debugGitTag = region.getTableDesc().getValue("GIT_COMMIT");
            final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getGtScanRequest())));
            ArrayList hbaseColumnsToGT = Lists.newArrayList();
            for (CubeVisitProtos.CubeVisitRequest.IntList intList : request.getHbaseColumnsToGTList()) {
                hbaseColumnsToGT.add(intList.getIntsList());
            }
            StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
            List<RawScan> hbaseRawScans = this.deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)request.getHbaseRawScan())));
            this.appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime()));
            MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware(){

                @Override
                public DimensionEncoding getDimEnc(TblColRef col) {
                    return scanReq.getInfo().getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
                }
            });
            final ArrayList cellListsForeachRawScan = Lists.newArrayList();
            for (RawScan hbaseRawScan : hbaseRawScans) {
                if (request.getRowkeyPreambleSize() - 8 > 0) {
                    this.updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - 8);
                }
                Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
                RegionScanner innerScanner = region.getScanner(scan);
                regionScanners.add(innerScanner);
                InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
                cellListsForeachRawScan.add(cellListIterator);
            }
            final Iterator allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
            if (behavior.ordinal() < StorageSideBehavior.SCAN.ordinal()) {
                ArrayList temp = Lists.newArrayList();
                int counter = 0;
                for (RegionScanner innerScanner : regionScanners) {
                    while (innerScanner.nextRaw((List)temp)) {
                        ++counter;
                    }
                }
                this.appendProfileInfo(sb, "scanned " + counter);
            }
            if (behavior.ordinal() < StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                scanReq.disableAggCacheMemCheck();
            }
            MutableBoolean scanNormalComplete = new MutableBoolean(true);
            long deadline = scanReq.getTimeout() + this.serviceStartTime;
            logger.info("deadline is " + deadline);
            long storagePushDownLimit = scanReq.getStoragePushDownLimit();
            CellListIterator cellListIterator = new CellListIterator(){
                int counter = 0;

                @Override
                public void close() throws IOException {
                    for (CellListIterator closeable : cellListsForeachRawScan) {
                        closeable.close();
                    }
                }

                @Override
                public boolean hasNext() {
                    if (this.counter > scanReq.getStorageScanRowNumThreshold()) {
                        throw new GTScanExceedThresholdException("Exceed scan threshold at " + this.counter);
                    }
                    if (this.counter % 1000 == 1) {
                        logger.info("Scanned " + this.counter + " rows from HBase.");
                    }
                    ++this.counter;
                    return allCellLists.hasNext();
                }

                @Override
                public List<Cell> next() {
                    return (List)allCellLists.next();
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
            HBaseReadonlyStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get((int)0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
            IGTScanner rawScanner = store.scan(scanReq);
            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), deadline);
            ByteBuffer buffer = ByteBuffer.allocate(0x100000);
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0x100000);
            int finalRowCount = 0;
            try {
                for (GTRecord oneRecord : finalScanner) {
                    if (finalRowCount % 100 == 1 && System.currentTimeMillis() > deadline) {
                        throw new GTScanTimeoutException("finalScanner timeouts after contributed " + finalRowCount);
                    }
                    buffer.clear();
                    try {
                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
                    }
                    catch (BufferOverflowException boe) {
                        buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
                    }
                    outputStream.write(buffer.array(), 0, buffer.position());
                    if (scanReq.isDoingStorageAggregation() || (long)(++finalRowCount) < storagePushDownLimit) continue;
                    logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
                    break;
                }
            }
            catch (GTScanTimeoutException e) {
                scanNormalComplete.setValue(false);
                logger.info("The cube visit did not finish normally because scan timeout", (Throwable)e);
            }
            catch (GTScanExceedThresholdException e) {
                scanNormalComplete.setValue(false);
                logger.info("The cube visit did not finish normally because scan num exceeds threshold", (Throwable)e);
            }
            finally {
                finalScanner.close();
            }
            this.appendProfileInfo(sb, "agg done");
            byte[] allRows = scanNormalComplete.booleanValue() ? outputStream.toByteArray() : new byte[]{};
            byte[] compressedAllRows = !kylinConfig.getCompressionResult() ? allRows : CompressionUtils.compress(allRows);
            this.appendProfileInfo(sb, "compress done");
            OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
            double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
            double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
            double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
            this.appendProfileInfo(sb, "server stats done");
            sb.append(" debugGitTag:" + debugGitTag);
            CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
            done.run((Object)responseBuilder.setCompressedRows(HBaseZeroCopyByteString.wrap((byte[])compressedAllRows)).setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().setAggregatedRowCount(finalScanner.getScannedRowCount() - (long)finalRowCount).setScannedRowCount(finalScanner.getScannedRowCount()).setServiceStartTime(this.serviceStartTime).setServiceEndTime(System.currentTimeMillis()).setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize).setFreeSwapSpaceSize(freeSwapSpaceSize).setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString()).setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build()).build());
        }
        catch (IOException ioe) {
            logger.error(ioe.toString(), (Throwable)ioe);
            IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe);
            ResponseConverter.setControllerException((RpcController)controller, (IOException)wrapped);
        }
        catch (OutOfMemoryError oom) {
            logger.error(oom.toString(), (Throwable)oom);
            IOException wrapped = new IOException("OOM in coprocessor " + debugGitTag, oom);
            ResponseConverter.setControllerException((RpcController)controller, (IOException)wrapped);
        }
        finally {
            for (RegionScanner innerScanner : regionScanners) {
                IOUtils.closeQuietly((Closeable)innerScanner);
            }
            if (region != null) {
                try {
                    region.closeRegionOperation();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public void start(CoprocessorEnvironment env) throws IOException {
        if (!(env instanceof RegionCoprocessorEnvironment)) {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
        this.env = (RegionCoprocessorEnvironment)env;
    }

    public void stop(CoprocessorEnvironment env) throws IOException {
        KylinConfig.destroyInstance();
    }

    public Service getService() {
        return this;
    }

    static class InnerScannerAsIterator
    implements CellListIterator {
        private RegionScanner regionScanner;
        private List<Cell> nextOne = Lists.newArrayList();
        private List<Cell> ret = Lists.newArrayList();
        private boolean hasMore;

        public InnerScannerAsIterator(RegionScanner regionScanner) {
            this.regionScanner = regionScanner;
            try {
                this.hasMore = regionScanner.nextRaw(this.nextOne);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public boolean hasNext() {
            return !this.nextOne.isEmpty();
        }

        @Override
        public List<Cell> next() {
            if (this.nextOne.size() < 1) {
                throw new IllegalStateException();
            }
            this.ret.clear();
            this.ret.addAll(this.nextOne);
            this.nextOne.clear();
            try {
                if (this.hasMore) {
                    this.hasMore = this.regionScanner.nextRaw(this.nextOne);
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return this.ret;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() throws IOException {
        }
    }
}

