/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.util;

import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HbaseStreamingInput {
    private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
    private static final int CELL_SIZE = 131072;
    private static final byte[] CF = "F".getBytes();
    private static final byte[] QN = "C".getBytes();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createTable(String tableName) throws IOException {
        Connection conn = HbaseStreamingInput.getConnection();
        Admin hadmin = conn.getAdmin();
        try {
            boolean tableExist = hadmin.tableExists(TableName.valueOf((String)tableName));
            if (tableExist) {
                logger.info("HTable '" + tableName + "' already exists");
                return;
            }
            logger.info("Creating HTable '" + tableName + "'");
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)tableName));
            desc.setValue("SPLIT_POLICY", DisabledRegionSplitPolicy.class.getName());
            desc.setMemStoreFlushSize(0x20000000L);
            HColumnDescriptor fd = new HColumnDescriptor(CF);
            fd.setBlocksize(131072);
            desc.addFamily(fd);
            hadmin.createTable(desc);
            logger.info("HTable '" + tableName + "' created");
        }
        finally {
            IOUtils.closeQuietly((Closeable)conn);
            IOUtils.closeQuietly((Closeable)hadmin);
        }
    }

    private static void scheduleJob(Semaphore semaphore, int interval) {
        while (true) {
            semaphore.release();
            try {
                Thread.sleep(interval);
                continue;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                continue;
            }
            break;
        }
    }

    public static void addData(String tableName) throws IOException {
        HbaseStreamingInput.createTable(tableName);
        final Semaphore semaphore = new Semaphore(0);
        new Thread(new Runnable(){

            @Override
            public void run() {
                HbaseStreamingInput.scheduleJob(semaphore, 300000);
            }
        }).start();
        while (true) {
            try {
                semaphore.acquire();
                int waiting = semaphore.availablePermits();
                if (waiting > 0) {
                    logger.warn("There are another " + waiting + " batches waiting to be added");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
            Connection conn = HbaseStreamingInput.getConnection();
            Table table = conn.getTable(TableName.valueOf((String)tableName));
            byte[] key = new byte[12];
            logger.info("============================================");
            long startTime = System.currentTimeMillis();
            logger.info("data load start time in millis: " + startTime);
            logger.info("data load start at " + HbaseStreamingInput.formatTime(startTime));
            ArrayList buffer = Lists.newArrayList();
            for (int i = 0; i < 1024; ++i) {
                long time = System.currentTimeMillis();
                Bytes.putLong(key, 0, time);
                Bytes.putInt(key, 8, i);
                Put put = new Put(key);
                byte[] cell = HbaseStreamingInput.randomBytes(131072);
                put.addColumn(CF, QN, cell);
                buffer.add(put);
            }
            table.put((List)buffer);
            table.close();
            conn.close();
            long endTime = System.currentTimeMillis();
            logger.info("data load end at " + HbaseStreamingInput.formatTime(endTime));
            logger.info("data load time consumed: " + (endTime - startTime));
            logger.info("============================================");
        }
    }

    public static void randomScan(String tableName) throws IOException {
        final Semaphore semaphore = new Semaphore(0);
        new Thread(new Runnable(){

            @Override
            public void run() {
                HbaseStreamingInput.scheduleJob(semaphore, 60000);
            }
        }).start();
        while (true) {
            try {
                semaphore.acquire();
                int waiting = semaphore.drainPermits();
                if (waiting > 0) {
                    logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
            Random r = new Random();
            Connection conn = HbaseStreamingInput.getConnection();
            Table table = conn.getTable(TableName.valueOf((String)tableName));
            long leftBound = HbaseStreamingInput.getFirstKeyTime(table);
            long rightBound = System.currentTimeMillis();
            for (int t = 0; t < 5; ++t) {
                long start = (long)((double)leftBound + r.nextDouble() * (double)(rightBound - leftBound));
                long end = start + 600000L;
                logger.info("A scan from " + HbaseStreamingInput.formatTime(start) + " to " + HbaseStreamingInput.formatTime(end));
                Scan scan = new Scan();
                scan.setStartRow(Bytes.toBytes(start));
                scan.setStopRow(Bytes.toBytes(end));
                scan.addFamily(CF);
                ResultScanner scanner = table.getScanner(scan);
                long hash = 0L;
                int rowCount = 0;
                for (Result result : scanner) {
                    Cell cell = result.getColumnLatestCell(CF, QN);
                    byte[] value = cell.getValueArray();
                    if (cell.getValueLength() != 131072) {
                        logger.error("value size invalid!!!!!");
                    }
                    hash += (long)Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
                    ++rowCount;
                }
                scanner.close();
                logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
            }
            table.close();
            conn.close();
        }
    }

    private static long getFirstKeyTime(Table table) throws IOException {
        long startTime = 0L;
        Scan scan = new Scan();
        scan.addFamily(CF);
        ResultScanner scanner = table.getScanner(scan);
        Iterator iterator = scanner.iterator();
        if (iterator.hasNext()) {
            Result result = (Result)iterator.next();
            Cell cell = result.getColumnLatestCell(CF, QN);
            byte[] key = cell.getRowArray();
            startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
            logger.info("Retrieved first record time: " + HbaseStreamingInput.formatTime(startTime));
        }
        scanner.close();
        return startTime;
    }

    private static Connection getConnection() throws IOException {
        return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
    }

    private static String formatTime(long time) {
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(time);
        return dateFormat.format(cal.getTime());
    }

    private static byte[] randomBytes(int lenth) {
        byte[] bytes = new byte[lenth];
        Random rand = new Random();
        rand.nextBytes(bytes);
        return bytes;
    }

    public static void main(String[] args) throws Exception {
        if (args[0].equalsIgnoreCase("createtable")) {
            HbaseStreamingInput.createTable(args[1]);
        } else if (args[0].equalsIgnoreCase("adddata")) {
            HbaseStreamingInput.addData(args[1]);
        } else if (args[0].equalsIgnoreCase("randomscan")) {
            HbaseStreamingInput.randomScan(args[1]);
        }
    }
}

