/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.cube.v2;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExpectedSizeIterator
implements Iterator<byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class);
    BlockingQueue<byte[]> queue;
    int expectedSize;
    int current = 0;
    long rpcTimeout;
    long timeout;
    long timeoutTS;
    volatile Throwable coprocException;

    public ExpectedSizeIterator(int expectedSize) {
        this.expectedSize = expectedSize;
        this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
        StringBuilder sb = new StringBuilder();
        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
        this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
        this.timeout = this.rpcTimeout * (long)hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
        sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout);
        this.timeout = (long)((float)this.timeout * KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes());
        sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout);
        logger.info(sb.toString());
        if (BackdoorToggles.getQueryTimeout() != -1) {
            this.timeout = BackdoorToggles.getQueryTimeout();
            logger.info("rpc timeout is overwritten to " + this.timeout);
        }
        this.timeoutTS = System.currentTimeMillis() + 2L * this.timeout;
    }

    @Override
    public boolean hasNext() {
        return this.current < this.expectedSize;
    }

    @Override
    public byte[] next() {
        if (this.current >= this.expectedSize) {
            throw new IllegalStateException("Won't have more data");
        }
        try {
            ++this.current;
            byte[] ret = null;
            while (ret == null && this.coprocException == null && this.timeoutTS > System.currentTimeMillis()) {
                ret = this.queue.poll(10000L, TimeUnit.MILLISECONDS);
            }
            if (this.coprocException != null) {
                if (this.coprocException instanceof GTScanSelfTerminatedException) {
                    throw (GTScanSelfTerminatedException)this.coprocException;
                }
                throw new RuntimeException("Error in coprocessor", this.coprocException);
            }
            if (ret == null) {
                throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every 100 scanned rows, the configured timeout(" + this.timeout + ") cannot support this many scans?");
            }
            return ret;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Error when waiting queue", e);
        }
    }

    @Override
    public void remove() {
        throw new NotImplementedException();
    }

    public void append(byte[] data) {
        try {
            this.queue.put(data);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("error when waiting queue", e);
        }
    }

    public long getRpcTimeout() {
        return this.timeout;
    }

    public void notifyCoprocException(Throwable ex) {
        this.coprocException = ex;
    }
}

