package org.apache.kylin.storage.hbase.util;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/storage/hbase/util/HbaseStreamingInput.class */
public class HbaseStreamingInput {
    private static final int CELL_SIZE = 131072;
    private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
    private static final byte[] CF = "F".getBytes();
    private static final byte[] QN = "C".getBytes();

    public static void createTable(String str) throws IOException {
        HConnection connection = getConnection();
        HBaseAdmin hBaseAdmin = new HBaseAdmin(connection);
        try {
            if (hBaseAdmin.tableExists(str)) {
                logger.info("HTable '" + str + "' already exists");
                connection.close();
                hBaseAdmin.close();
                return;
            }
            logger.info("Creating HTable '" + str + "'");
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(str));
            hTableDescriptor.setValue("SPLIT_POLICY", DisabledRegionSplitPolicy.class.getName());
            hTableDescriptor.setMemStoreFlushSize(536870912L);
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(CF);
            hColumnDescriptor.setBlocksize(131072);
            hTableDescriptor.addFamily(hColumnDescriptor);
            hBaseAdmin.createTable(hTableDescriptor);
            logger.info("HTable '" + str + "' created");
            connection.close();
            hBaseAdmin.close();
        } catch (Throwable th) {
            connection.close();
            hBaseAdmin.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void scheduleJob(Semaphore semaphore, int i) {
        while (true) {
            semaphore.release();
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void addData(String str) throws IOException {
        createTable(str);
        final Semaphore semaphore = new Semaphore(0);
        new Thread(new Runnable() { // from class: org.apache.kylin.storage.hbase.util.HbaseStreamingInput.1
            @Override // java.lang.Runnable
            public void run() {
                HbaseStreamingInput.scheduleJob(semaphore, 300000);
            }
        }).start();
        while (true) {
            try {
                semaphore.acquire();
                int availablePermits = semaphore.availablePermits();
                if (availablePermits > 0) {
                    logger.warn("There are another " + availablePermits + " batches waiting to be added");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            HConnection connection = getConnection();
            HTableInterface table = connection.getTable(str);
            byte[] bArr = new byte[12];
            logger.info("============================================");
            long currentTimeMillis = System.currentTimeMillis();
            logger.info("data load start time in millis: " + currentTimeMillis);
            logger.info("data load start at " + formatTime(currentTimeMillis));
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < 1024; i++) {
                Bytes.putLong(bArr, 0, System.currentTimeMillis());
                Bytes.putInt(bArr, 8, i);
                Put put = new Put(bArr);
                put.add(CF, QN, randomBytes(131072));
                newArrayList.add(put);
            }
            table.put(newArrayList);
            table.close();
            connection.close();
            long currentTimeMillis2 = System.currentTimeMillis();
            logger.info("data load end at " + formatTime(currentTimeMillis2));
            logger.info("data load time consumed: " + (currentTimeMillis2 - currentTimeMillis));
            logger.info("============================================");
        }
    }

    public static void randomScan(String str) throws IOException {
        final Semaphore semaphore = new Semaphore(0);
        new Thread(new Runnable() { // from class: org.apache.kylin.storage.hbase.util.HbaseStreamingInput.2
            @Override // java.lang.Runnable
            public void run() {
                HbaseStreamingInput.scheduleJob(semaphore, DateUtils.MILLIS_IN_MINUTE);
            }
        }).start();
        while (true) {
            try {
                semaphore.acquire();
                int drainPermits = semaphore.drainPermits();
                if (drainPermits > 0) {
                    logger.warn("Too many queries to handle! Blocking " + drainPermits + " sets of scan requests");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Random random = new Random();
            HConnection connection = getConnection();
            HTableInterface table = connection.getTable(str);
            long firstKeyTime = getFirstKeyTime(table);
            long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i < 5; i++) {
                long nextDouble = (long) (firstKeyTime + (random.nextDouble() * (currentTimeMillis - firstKeyTime)));
                long j = nextDouble + 600000;
                logger.info("A scan from " + formatTime(nextDouble) + " to " + formatTime(j));
                Scan scan = new Scan();
                scan.setStartRow(Bytes.toBytes(nextDouble));
                scan.setStopRow(Bytes.toBytes(j));
                scan.addFamily(CF);
                ResultScanner scanner = table.getScanner(scan);
                long j2 = 0;
                int i2 = 0;
                Iterator it2 = scanner.iterator();
                while (it2.hasNext()) {
                    Cell columnLatestCell = ((Result) it2.next()).getColumnLatestCell(CF, QN);
                    byte[] valueArray = columnLatestCell.getValueArray();
                    if (columnLatestCell.getValueLength() != 131072) {
                        logger.error("value size invalid!!!!!");
                    }
                    j2 += Arrays.hashCode(Arrays.copyOfRange(valueArray, columnLatestCell.getValueOffset(), columnLatestCell.getValueLength() + columnLatestCell.getValueOffset()));
                    i2++;
                }
                scanner.close();
                logger.info("Scanned " + i2 + " rows, the (meaningless) hash for the scan is " + j2);
            }
            table.close();
            connection.close();
        }
    }

    private static long getFirstKeyTime(HTableInterface hTableInterface) throws IOException {
        long j = 0;
        Scan scan = new Scan();
        scan.addFamily(CF);
        ResultScanner scanner = hTableInterface.getScanner(scan);
        Iterator it2 = scanner.iterator();
        if (it2.hasNext()) {
            Cell columnLatestCell = ((Result) it2.next()).getColumnLatestCell(CF, QN);
            j = Bytes.toLong(columnLatestCell.getRowArray(), columnLatestCell.getRowOffset(), 8);
            logger.info("Retrieved first record time: " + formatTime(j));
        }
        scanner.close();
        return j;
    }

    private static HConnection getConnection() throws IOException {
        return HConnectionManager.createConnection(HBaseConnection.getCurrentHBaseConfiguration());
    }

    private static String formatTime(long j) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(j);
        return simpleDateFormat.format(calendar.getTime());
    }

    private static byte[] randomBytes(int i) {
        byte[] bArr = new byte[i];
        new Random().nextBytes(bArr);
        return bArr;
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr[0].equalsIgnoreCase("createtable")) {
            createTable(strArr[1]);
        } else if (strArr[0].equalsIgnoreCase("adddata")) {
            addData(strArr[1]);
        } else if (strArr[0].equalsIgnoreCase("randomscan")) {
            randomScan(strArr[1]);
        }
    }
}
