/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.util;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeployCoprocessorCLI {
    public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
    public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
    public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
    public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
    public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);

    public static void main(String[] args) throws IOException {
        if (args == null || args.length <= 1) {
            DeployCoprocessorCLI.printUsageAndExit();
        }
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
        FileSystem fileSystem = FileSystem.get((Configuration)hconf);
        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
        String localCoprocessorJar = "default".equals(args[0]) ? kylinConfig.getCoprocessorLocalJar() : new File(args[0]).getAbsolutePath();
        logger.info("Identify coprocessor jar " + localCoprocessorJar);
        List<String> tableNames = DeployCoprocessorCLI.getHTableNames(kylinConfig);
        logger.info("Identify tables " + tableNames);
        String filterType = args[1].toLowerCase();
        if (filterType.equals("-table")) {
            tableNames = DeployCoprocessorCLI.filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
        } else if (filterType.equals("-cube")) {
            tableNames = DeployCoprocessorCLI.filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
        } else if (!filterType.equals("all")) {
            DeployCoprocessorCLI.printUsageAndExit();
        }
        logger.info("Will execute tables " + tableNames);
        Set<String> oldJarPaths = DeployCoprocessorCLI.getCoprocessorJarPaths(hbaseAdmin, tableNames);
        logger.info("Old coprocessor jar: " + oldJarPaths);
        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
        List<String> processedTables = DeployCoprocessorCLI.resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
        hbaseAdmin.close();
        logger.info("Processed " + processedTables);
        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
    }

    private static void printUsageAndExit() {
        logger.info("Probe run, exiting. Append argument 'all' or specific tables/cubes to execute.");
        System.exit(0);
    }

    private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
        CubeManager cubeManager = CubeManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv());
        ArrayList result = Lists.newArrayList();
        for (String c : cubeNames) {
            if ((c = c.trim()).endsWith(",")) {
                c = c.substring(0, c.length() - 1);
            }
            CubeInstance cubeInstance = cubeManager.getCube(c);
            for (CubeSegment segment : cubeInstance.getSegments()) {
                String tableName = segment.getStorageLocationIdentifier();
                if (!allTableNames.contains(tableName)) continue;
                result.add(tableName);
            }
        }
        return result;
    }

    private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
        ArrayList result = Lists.newArrayList();
        for (String t : tableNames) {
            if ((t = t.trim()).endsWith(",")) {
                t = t.substring(0, t.length() - 1);
            }
            if (!allTableNames.contains(t)) continue;
            result.add(t);
        }
        return result;
    }

    public static void deployCoprocessor(HTableDescriptor tableDesc) {
        try {
            DeployCoprocessorCLI.initHTableCoprocessor(tableDesc);
            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
        }
        catch (Exception ex) {
            logger.error("Error deploying coprocessor on " + tableDesc.getName(), (Throwable)ex);
            logger.error("Will try creating the table without coprocessor.");
        }
    }

    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
        FileSystem fileSystem = FileSystem.get((Configuration)hconf);
        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
    }

    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
        logger.info("Add coprocessor on " + desc.getNameAsString());
        desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
        desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
    }

    public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
        logger.info("Disable " + tableName);
        hbaseAdmin.disableTable(tableName);
        logger.info("Unset coprocessor on " + tableName);
        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf((String)tableName));
        while (desc.hasCoprocessor(CubeObserverClass)) {
            desc.removeCoprocessor(CubeObserverClass);
        }
        while (desc.hasCoprocessor(CubeEndpointClass)) {
            desc.removeCoprocessor(CubeEndpointClass);
        }
        while (desc.hasCoprocessor(IIEndpointClass)) {
            desc.removeCoprocessor(IIEndpointClass);
        }
        while (desc.hasCoprocessor(CubeObserverClassOld)) {
            desc.removeCoprocessor(CubeObserverClassOld);
        }
        while (desc.hasCoprocessor(IIEndpointClassOld)) {
            desc.removeCoprocessor(IIEndpointClassOld);
        }
        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
        String commitInfo = KylinVersion.getGitCommitInfo();
        if (!StringUtils.isEmpty((String)commitInfo)) {
            desc.setValue("GIT_COMMIT", commitInfo);
        }
        hbaseAdmin.modifyTable(tableName, desc);
        logger.info("Enable " + tableName);
        hbaseAdmin.enableTable(tableName);
    }

    private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
        ArrayList<String> processed = new ArrayList<String>();
        for (String tableName : tableNames) {
            try {
                DeployCoprocessorCLI.resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
                processed.add(tableName);
            }
            catch (IOException ex) {
                logger.error("Error processing " + tableName, (Throwable)ex);
            }
        }
        return processed;
    }

    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
        Path coprocessorDir = DeployCoprocessorCLI.getCoprocessorHDFSDir(fileSystem, config);
        FileStatus newestJar = null;
        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
            if (!fileStatus.getPath().toString().endsWith(".jar")) continue;
            if (newestJar == null) {
                newestJar = fileStatus;
                continue;
            }
            if (newestJar.getModificationTime() >= fileStatus.getModificationTime()) continue;
            newestJar = fileStatus;
        }
        if (newestJar == null) {
            return null;
        }
        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
        logger.info("The newest coprocessor is " + path.toString());
        return path;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
        Path uploadPath;
        block7: {
            uploadPath = null;
            File localCoprocessorFile = new File(localCoprocessorJar);
            if (oldJarPaths == null) {
                oldJarPaths = new HashSet<String>();
            }
            Path coprocessorDir = DeployCoprocessorCLI.getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
            for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
                if (DeployCoprocessorCLI.isSame(localCoprocessorFile, fileStatus)) {
                    uploadPath = fileStatus.getPath();
                    break;
                }
                String filename = fileStatus.getPath().toString();
                if (!filename.endsWith(".jar")) continue;
                oldJarPaths.add(filename);
            }
            if (uploadPath != null) break block7;
            HashSet<String> oldJarNames = new HashSet<String>();
            for (String path : oldJarPaths) {
                oldJarNames.add(new Path(path).getName());
            }
            String baseName = DeployCoprocessorCLI.getBaseFileName(localCoprocessorJar);
            String newName = null;
            int i = 0;
            while (newName == null) {
                newName = baseName + "-" + i++ + ".jar";
                if (!oldJarNames.contains(newName)) continue;
                newName = null;
            }
            uploadPath = new Path(coprocessorDir, newName);
            FileInputStream in = null;
            FSDataOutputStream out = null;
            try {
                in = new FileInputStream(localCoprocessorFile);
                out = fileSystem.create(uploadPath);
                IOUtils.copy((InputStream)in, (OutputStream)out);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(in);
                IOUtils.closeQuietly(out);
                throw throwable;
            }
            IOUtils.closeQuietly((InputStream)in);
            IOUtils.closeQuietly((OutputStream)out);
            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1L);
        }
        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
        return uploadPath;
    }

    private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
        return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
    }

    private static String getBaseFileName(String localCoprocessorJar) {
        File localJar = new File(localCoprocessorJar);
        String baseName = localJar.getName();
        if (baseName.endsWith(".jar")) {
            baseName = baseName.substring(0, baseName.length() - ".jar".length());
        }
        return baseName;
    }

    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
        fileSystem.mkdirs(coprocessorDir);
        return coprocessorDir;
    }

    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
        HashSet<String> result = new HashSet<String>();
        for (String tableName : tableNames) {
            HTableDescriptor tableDescriptor = null;
            try {
                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf((String)tableName));
            }
            catch (TableNotFoundException e) {
                logger.warn("Table not found " + tableName, (Throwable)e);
                continue;
            }
            for (Map.Entry e : tableDescriptor.getValues().entrySet()) {
                String jarPath;
                Matcher valueMatcher;
                Matcher keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString((byte[])((ImmutableBytesWritable)e.getKey()).get()));
                if (!keyMatcher.matches() || !(valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString((byte[])((ImmutableBytesWritable)e.getValue()).get()))).matches() || !StringUtils.isNotEmpty((String)(jarPath = valueMatcher.group(1).trim()))) continue;
                result.add(jarPath);
            }
        }
        return result;
    }

    private static List<String> getHTableNames(KylinConfig config) {
        CubeManager cubeMgr = CubeManager.getInstance((KylinConfig)config);
        ArrayList<String> result = new ArrayList<String>();
        for (CubeInstance cube : cubeMgr.listAllCubes()) {
            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
                String tableName = seg.getStorageLocationIdentifier();
                if (StringUtils.isBlank((String)tableName)) continue;
                result.add(tableName);
                System.out.println("added new table: " + tableName);
            }
        }
        return result;
    }
}

