/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.util;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeployCoprocessorCLI {
    public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
    public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
    public static final String CubeObserverClassOld2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
    public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
    public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
    private static int MAX_THREADS = 8;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        if (args == null || args.length <= 1) {
            DeployCoprocessorCLI.printUsageAndExit();
        }
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
        FileSystem fileSystem = FileSystem.get((Configuration)hconf);
        Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
        try (Admin hbaseAdmin = null;){
            hbaseAdmin = conn.getAdmin();
            int curIdx = 0;
            String localCoprocessorJar = "default".equals(args[curIdx++]) ? kylinConfig.getCoprocessorLocalJar() : new File(args[curIdx]).getAbsolutePath();
            logger.info("Identify coprocessor jar " + localCoprocessorJar);
            try {
                MAX_THREADS = Integer.parseInt(args[curIdx++]);
            }
            catch (Exception e) {}
            logger.info("Use at most {} threads to do upgrade", (Object)MAX_THREADS);
            List<String> tableNames = DeployCoprocessorCLI.getHTableNames(kylinConfig);
            logger.info("Identify tables " + tableNames);
            int n = --curIdx;
            ++curIdx;
            String filterType = args[n].toLowerCase(Locale.ROOT);
            if (filterType.equals("-table")) {
                tableNames = DeployCoprocessorCLI.filterByTables(tableNames, Arrays.asList(args).subList(curIdx, args.length));
            } else if (filterType.equals("-cube")) {
                tableNames = DeployCoprocessorCLI.filterByCubes(tableNames, Arrays.asList(args).subList(curIdx, args.length));
            } else if (filterType.equals("-project")) {
                tableNames = DeployCoprocessorCLI.filterByProjects(tableNames, Arrays.asList(args).subList(curIdx, args.length));
            } else if (!filterType.equals("all")) {
                DeployCoprocessorCLI.printUsageAndExit();
            }
            logger.info("Tables after filtering by type " + filterType + ": " + tableNames);
            tableNames = DeployCoprocessorCLI.filterByGitCommit(hbaseAdmin, tableNames);
            logger.info("Will execute tables " + tableNames);
            long start = System.currentTimeMillis();
            Set<String> oldJarPaths = DeployCoprocessorCLI.getCoprocessorJarPaths(hbaseAdmin, tableNames);
            logger.info("Old coprocessor jar: " + oldJarPaths);
            Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
            logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
            Pair<List<String>, List<String>> results = DeployCoprocessorCLI.resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
            logger.info("Processed time: " + (System.currentTimeMillis() - start));
            logger.info("Processed tables count: " + results.getFirst().size());
            logger.info("Processed tables: " + results.getFirst());
            logger.error("Failed tables count: " + results.getSecond().size());
            logger.error("Failed tables : " + results.getSecond());
            logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
        }
    }

    private static void printUsageAndExit() {
        logger.info("Usage: ");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] all");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -table tableName1 tableName2 ...");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -cube cubeName1 cubeName2 ... ");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -project projectName1 projectName2 ...");
        System.exit(0);
    }

    private static List<String> filterByGitCommit(Admin hbaseAdmin, List<String> tableNames) throws IOException {
        LinkedList result = Lists.newLinkedList();
        LinkedList filteredList = Lists.newLinkedList();
        String commitInfo = KylinVersion.getGitCommitInfo();
        if (StringUtils.isEmpty((String)commitInfo)) {
            return tableNames;
        }
        logger.info("Commit Information: " + commitInfo);
        for (String tableName : tableNames) {
            HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf((String)tableName));
            String gitTag = tableDesc.getValue("GIT_COMMIT");
            if (commitInfo.equals(gitTag)) {
                filteredList.add(tableName);
                continue;
            }
            result.add(tableName);
        }
        logger.info("Filtered tables don't need to deploy coprocessors: " + filteredList);
        return result;
    }

    private static List<String> filterByProjects(List<String> allTableNames, List<String> projectNames) {
        ProjectManager projectManager = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        ArrayList result = Lists.newArrayList();
        for (String p : projectNames) {
            if ((p = p.trim()).endsWith(",")) {
                p = p.substring(0, p.length() - 1);
            }
            ProjectInstance projectInstance = projectManager.getProject(p);
            List<RealizationEntry> cubeList = projectInstance.getRealizationEntries(RealizationType.CUBE);
            for (RealizationEntry cube : cubeList) {
                CubeInstance cubeInstance = cubeManager.getCube(cube.getRealization());
                for (CubeSegment segment : cubeInstance.getSegments()) {
                    String tableName = segment.getStorageLocationIdentifier();
                    if (!allTableNames.contains(tableName)) continue;
                    result.add(tableName);
                }
            }
        }
        return result;
    }

    private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        ArrayList result = Lists.newArrayList();
        for (String c : cubeNames) {
            if ((c = c.trim()).endsWith(",")) {
                c = c.substring(0, c.length() - 1);
            }
            CubeInstance cubeInstance = cubeManager.getCube(c);
            for (CubeSegment segment : cubeInstance.getSegments()) {
                String tableName = segment.getStorageLocationIdentifier();
                if (!allTableNames.contains(tableName)) continue;
                result.add(tableName);
            }
        }
        return result;
    }

    private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
        ArrayList result = Lists.newArrayList();
        for (String t : tableNames) {
            if ((t = t.trim()).endsWith(",")) {
                t = t.substring(0, t.length() - 1);
            }
            if (!allTableNames.contains(t)) continue;
            result.add(t);
        }
        return result;
    }

    public static void deployCoprocessor(HTableDescriptor tableDesc) {
        try {
            DeployCoprocessorCLI.initHTableCoprocessor(tableDesc);
            logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
        }
        catch (Exception ex) {
            logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), (Throwable)ex);
            logger.error("Will try creating the table without coprocessor.");
        }
    }

    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
        FileSystem fileSystem = FileSystem.get((Configuration)hconf);
        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
    }

    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
        logger.info("Add coprocessor on " + desc.getNameAsString());
        desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
    }

    public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf((String)tableName));
        String host = desc.getValue("KYLIN_HOST");
        if (!host.equalsIgnoreCase(kylinConfig.getMetadataUrlPrefix())) {
            logger.warn("This server doesn't own this table: " + tableName);
            return false;
        }
        logger.info("reset coprocessor on " + tableName);
        logger.info("Disable " + tableName);
        hbaseAdmin.disableTable(TableName.valueOf((String)tableName));
        while (desc.hasCoprocessor(CubeObserverClassOld2)) {
            desc.removeCoprocessor(CubeObserverClassOld2);
        }
        while (desc.hasCoprocessor(CubeEndpointClass)) {
            desc.removeCoprocessor(CubeEndpointClass);
        }
        while (desc.hasCoprocessor(IIEndpointClass)) {
            desc.removeCoprocessor(IIEndpointClass);
        }
        while (desc.hasCoprocessor(CubeObserverClassOld)) {
            desc.removeCoprocessor(CubeObserverClassOld);
        }
        while (desc.hasCoprocessor(IIEndpointClassOld)) {
            desc.removeCoprocessor(IIEndpointClassOld);
        }
        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
        String commitInfo = KylinVersion.getGitCommitInfo();
        if (!StringUtils.isEmpty((String)commitInfo)) {
            desc.setValue("GIT_COMMIT", commitInfo);
        }
        hbaseAdmin.modifyTable(TableName.valueOf((String)tableName), desc);
        logger.info("Enable " + tableName);
        hbaseAdmin.enableTable(TableName.valueOf((String)tableName));
        return true;
    }

    private static Pair<List<String>, List<String>> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
        List<String> processedTables = Collections.synchronizedList(new ArrayList());
        List<String> failedTables = Collections.synchronizedList(new ArrayList());
        int nThread = Runtime.getRuntime().availableProcessors() * 2;
        if (nThread > MAX_THREADS) {
            nThread = MAX_THREADS;
        }
        logger.info("Use {} threads to do upgrade", (Object)nThread);
        ExecutorService coprocessorPool = Executors.newFixedThreadPool(nThread);
        CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
        for (String tableName : tableNames) {
            coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables, failedTables));
        }
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            logger.error("reset coprocessor failed: ", (Throwable)e);
        }
        coprocessorPool.shutdown();
        return new Pair<List<String>, List<String>>(processedTables, failedTables);
    }

    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
        Path coprocessorDir = DeployCoprocessorCLI.getCoprocessorHDFSDir(fileSystem, config);
        FileStatus newestJar = null;
        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
            if (!fileStatus.getPath().toString().endsWith(".jar")) continue;
            if (newestJar == null) {
                newestJar = fileStatus;
                continue;
            }
            if (newestJar.getModificationTime() >= fileStatus.getModificationTime()) continue;
            newestJar = fileStatus;
        }
        if (newestJar == null) {
            return null;
        }
        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
        logger.info("The newest coprocessor is " + path.toString());
        return path;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
        Path uploadPath;
        block7: {
            uploadPath = null;
            File localCoprocessorFile = new File(localCoprocessorJar);
            if (oldJarPaths == null) {
                oldJarPaths = new HashSet<String>();
            }
            Path coprocessorDir = DeployCoprocessorCLI.getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
            for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
                if (DeployCoprocessorCLI.isSame(localCoprocessorFile, fileStatus)) {
                    uploadPath = fileStatus.getPath();
                    break;
                }
                String filename = fileStatus.getPath().toString();
                if (!filename.endsWith(".jar")) continue;
                oldJarPaths.add(filename);
            }
            if (uploadPath != null) break block7;
            HashSet<String> oldJarNames = new HashSet<String>();
            for (String path : oldJarPaths) {
                oldJarNames.add(new Path(path).getName());
            }
            String baseName = DeployCoprocessorCLI.getBaseFileName(localCoprocessorJar);
            String newName = null;
            int i = 0;
            while (newName == null) {
                newName = baseName + "-" + i++ + ".jar";
                if (!oldJarNames.contains(newName)) continue;
                newName = null;
            }
            uploadPath = new Path(coprocessorDir, newName);
            FileInputStream in = null;
            FSDataOutputStream out = null;
            try {
                in = new FileInputStream(localCoprocessorFile);
                out = fileSystem.create(uploadPath);
                IOUtils.copy((InputStream)in, (OutputStream)out);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(in);
                IOUtils.closeQuietly(out);
                throw throwable;
            }
            IOUtils.closeQuietly((InputStream)in);
            IOUtils.closeQuietly((OutputStream)out);
            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1L);
        }
        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
        return uploadPath;
    }

    private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
        return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
    }

    private static String getBaseFileName(String localCoprocessorJar) {
        File localJar = new File(localCoprocessorJar);
        String baseName = localJar.getName();
        if (baseName.endsWith(".jar")) {
            baseName = baseName.substring(0, baseName.length() - ".jar".length());
        }
        return baseName;
    }

    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
        hdfsWorkingDirectory = HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
        fileSystem.mkdirs(coprocessorDir);
        return coprocessorDir;
    }

    private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
        HashSet<String> result = new HashSet<String>();
        for (String tableName : tableNames) {
            HTableDescriptor tableDescriptor = null;
            try {
                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf((String)tableName));
            }
            catch (TableNotFoundException e) {
                logger.warn("Table not found " + tableName, (Throwable)e);
                continue;
            }
            for (Map.Entry e : tableDescriptor.getValues().entrySet()) {
                String jarPath;
                Matcher valueMatcher;
                Matcher keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(((ImmutableBytesWritable)e.getKey()).get()));
                if (!keyMatcher.matches() || !(valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(((ImmutableBytesWritable)e.getValue()).get()))).matches() || !StringUtils.isNotEmpty((String)(jarPath = valueMatcher.group(1).trim()))) continue;
                result.add(jarPath);
            }
        }
        return result;
    }

    private static List<String> getHTableNames(KylinConfig config) {
        CubeManager cubeMgr = CubeManager.getInstance(config);
        ArrayList<String> result = new ArrayList<String>();
        for (CubeInstance cube : cubeMgr.listAllCubes()) {
            if (cube.getStorageType() != 0 && cube.getStorageType() != 2) continue;
            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
                String tableName = seg.getStorageLocationIdentifier();
                if (StringUtils.isBlank((String)tableName)) continue;
                result.add(tableName);
                System.out.println("added new table: " + tableName);
            }
        }
        return result;
    }

    private static class ResetCoprocessorWorker
    implements Runnable {
        private final CountDownLatch countDownLatch;
        private final Admin hbaseAdmin;
        private final Path hdfsCoprocessorJar;
        private final String tableName;
        private final List<String> processedTables;
        private final List<String> failedTables;

        public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables, List<String> failedTables) {
            this.countDownLatch = countDownLatch;
            this.hbaseAdmin = hbaseAdmin;
            this.hdfsCoprocessorJar = hdfsCoprocessorJar;
            this.tableName = tableName;
            this.processedTables = processedTables;
            this.failedTables = failedTables;
        }

        @Override
        public void run() {
            try {
                boolean isProcessed = DeployCoprocessorCLI.resetCoprocessor(this.tableName, this.hbaseAdmin, this.hdfsCoprocessorJar);
                if (isProcessed) {
                    this.processedTables.add(this.tableName);
                } else {
                    this.failedTables.add(this.tableName);
                }
            }
            catch (Exception ex) {
                this.failedTables.add(this.tableName);
                logger.error("Error processing " + this.tableName, (Throwable)ex);
            }
            finally {
                this.countDownLatch.countDown();
            }
        }
    }
}

