/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseConnection {
    public static final String HTABLE_UUID_TAG = "UUID";
    private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
    private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>();
    private static final Map<String, HConnection> connPool = new ConcurrentHashMap<String, HConnection>();
    private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal();
    private static ExecutorService coprocessorPool = null;
    public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ExecutorService getCoprocessorPool() {
        if (coprocessorPool != null) {
            return coprocessorPool;
        }
        Class<HBaseConnection> clazz = HBaseConnection.class;
        synchronized (HBaseConnection.class) {
            if (coprocessorPool != null) {
                // ** MonitorExit[var0] (shouldn't be in output)
                return coprocessorPool;
            }
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            int maxThreads = config.getHBaseMaxConnectionThreads();
            int coreThreads = config.getHBaseCoreConnectionThreads();
            long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
            LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
            ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory((String)"kylin-coproc-"));
            tpe.allowCoreThreadTimeOut(true);
            logger.info("Creating coprocessor thread pool with max of {}, core of {}", (Object)maxThreads, (Object)coreThreads);
            coprocessorPool = tpe;
            // ** MonitorExit[var0] (shouldn't be in output)
            return coprocessorPool;
        }
    }

    private static void closeCoprocessorPool() {
        if (coprocessorPool == null) {
            return;
        }
        coprocessorPool.shutdown();
        try {
            if (!coprocessorPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                coprocessorPool.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            coprocessorPool.shutdownNow();
        }
    }

    public static void clearConnCache() {
        connPool.clear();
    }

    public static Configuration getCurrentHBaseConfiguration() {
        if (configThreadLocal.get() == null) {
            String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
            configThreadLocal.set(HBaseConnection.newHBaseConfiguration(storageUrl));
        }
        return configThreadLocal.get();
    }

    private static Configuration newHBaseConfiguration(String url) {
        if (!StringUtils.isEmpty((String)url) && !"hbase".equals(url)) {
            throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
        }
        Configuration conf = HBaseConfiguration.create((Configuration)HadoopUtil.getCurrentConfiguration());
        HBaseConnection.addHBaseClusterNNHAConfiguration(conf);
        String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
        if (StringUtils.isNotEmpty((String)hbaseClusterFs)) {
            conf.set("fs.defaultFS", hbaseClusterFs);
        }
        if (StringUtils.isBlank((String)conf.get("hadoop.tmp.dir"))) {
            conf.set("hadoop.tmp.dir", "/tmp");
        }
        if (StringUtils.isBlank((String)conf.get("hbase.fs.tmp.dir"))) {
            conf.set("hbase.fs.tmp.dir", "/tmp");
        }
        conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
        conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
        conf.set("hbase.client.operation.timeout", "60000");
        return conf;
    }

    public static void addHBaseClusterNNHAConfiguration(Configuration conf) {
        String hdfsConfigFile = KylinConfig.getInstanceFromEnv().getHBaseClusterHDFSConfigFile();
        if (hdfsConfigFile == null || hdfsConfigFile.isEmpty()) {
            return;
        }
        Configuration hdfsConf = new Configuration(false);
        hdfsConf.addResource(hdfsConfigFile);
        Collection nameServices = hdfsConf.getTrimmedStringCollection("dfs.nameservices");
        Collection mainNameServices = conf.getTrimmedStringCollection("dfs.nameservices");
        for (String serviceId : nameServices) {
            mainNameServices.add(serviceId);
            String serviceConfKey = "dfs.ha.namenodes." + serviceId;
            String proxyConfKey = "dfs.client.failover.proxy.provider." + serviceId;
            conf.set(serviceConfKey, hdfsConf.get(serviceConfKey, ""));
            conf.set(proxyConfKey, hdfsConf.get(proxyConfKey, ""));
            Collection nameNodes = hdfsConf.getTrimmedStringCollection(serviceConfKey);
            for (String nameNode : nameNodes) {
                String rpcConfKey = "dfs.namenode.rpc-address." + serviceId + "." + nameNode;
                conf.set(rpcConfKey, hdfsConf.get(rpcConfKey, ""));
            }
        }
        conf.setStrings("dfs.nameservices", mainNameServices.toArray(new String[0]));
        conf.setStrings(JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE, nameServices.toArray(new String[0]));
    }

    public static String makeQualifiedPathInHBaseCluster(String path) {
        try {
            FileSystem fs = FileSystem.get((Configuration)HBaseConnection.getCurrentHBaseConfiguration());
            return fs.makeQualified(new Path(path)).toString();
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
        }
    }

    public static HConnection get(String url) {
        Configuration conf = configCache.get(url);
        if (conf == null) {
            conf = HBaseConnection.newHBaseConfiguration(url);
            configCache.put(url, conf);
        }
        HConnection connection = connPool.get(url);
        try {
            while (true) {
                if (connection == null || connection.isClosed()) {
                    logger.info("connection is null or closed, creating a new one");
                    connection = HConnectionManager.createConnection((Configuration)conf);
                    connPool.put(url, connection);
                }
                if (connection == null || connection.isClosed()) {
                    Thread.sleep(10000L);
                    continue;
                }
                break;
            }
        }
        catch (Throwable t) {
            logger.error("Error when open connection " + url, t);
            throw new StorageException("Error when open connection " + url, t);
        }
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean tableExists(HConnection conn, String tableName) throws IOException {
        try (HBaseAdmin hbase = new HBaseAdmin(conn);){
            boolean bl = hbase.tableExists(TableName.valueOf((String)tableName));
            return bl;
        }
    }

    public static boolean tableExists(String hbaseUrl, String tableName) throws IOException {
        return HBaseConnection.tableExists(HBaseConnection.get(hbaseUrl), tableName);
    }

    public static void createHTableIfNeeded(String hbaseUrl, String tableName, String ... families) throws IOException {
        HBaseConnection.createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
    }

    public static void deleteTable(String hbaseUrl, String tableName) throws IOException {
        HBaseConnection.deleteTable(HBaseConnection.get(hbaseUrl), tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createHTableIfNeeded(HConnection conn, String table, String ... families) throws IOException {
        try (HBaseAdmin hbase = new HBaseAdmin(conn);){
            if (HBaseConnection.tableExists(conn, table)) {
                logger.debug("HTable '" + table + "' already exists");
                Set<String> existingFamilies = HBaseConnection.getFamilyNames(hbase.getTableDescriptor(TableName.valueOf((String)table)));
                boolean wait = false;
                for (String family : families) {
                    if (existingFamilies.contains(family)) continue;
                    logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
                    hbase.addColumn(table, HBaseConnection.newFamilyDescriptor(family));
                    wait = true;
                }
                if (wait) {
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (InterruptedException e) {
                        logger.warn("", (Throwable)e);
                    }
                }
                return;
            }
            logger.debug("Creating HTable '" + table + "'");
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)table));
            if (null != families && families.length > 0) {
                for (String family : families) {
                    HColumnDescriptor fd = HBaseConnection.newFamilyDescriptor(family);
                    desc.addFamily(fd);
                }
            }
            desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString());
            hbase.createTable(desc);
            logger.debug("HTable '" + table + "' created");
        }
    }

    private static Set<String> getFamilyNames(HTableDescriptor desc) {
        HashSet result = Sets.newHashSet();
        for (byte[] bytes : desc.getFamiliesKeys()) {
            try {
                result.add(new String(bytes, "UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                logger.error(e.toString());
            }
        }
        return result;
    }

    private static HColumnDescriptor newFamilyDescriptor(String family) {
        HColumnDescriptor fd = new HColumnDescriptor(family);
        fd.setInMemory(true);
        return fd;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void deleteTable(HConnection conn, String tableName) throws IOException {
        try (HBaseAdmin hbase = new HBaseAdmin(conn);){
            if (!HBaseConnection.tableExists(conn, tableName)) {
                logger.debug("HTable '" + tableName + "' does not exists");
                return;
            }
            logger.debug("delete HTable '" + tableName + "'");
            if (hbase.isTableEnabled(tableName)) {
                hbase.disableTable(tableName);
            }
            hbase.deleteTable(tableName);
            logger.debug("HTable '" + tableName + "' deleted");
        }
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                HBaseConnection.closeCoprocessorPool();
                for (HConnection conn : connPool.values()) {
                    try {
                        conn.close();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

