package org.apache.kylin.storage.hbase.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.tools.ant.taskdefs.optional.sos.SOSCmd;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;

/* loaded from: input_file:WEB-INF/lib/kylin-storage-hbase-2.6.5.jar:org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.class */
public class DeployCoprocessorCLI {
    public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
    public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
    public static final String CubeObserverClassOld2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
    public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
    public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DeployCoprocessorCLI.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-storage-hbase-2.6.5.jar:org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI$ResetCoprocessorWorker.class */
    public static class ResetCoprocessorWorker implements Runnable {
        private final CountDownLatch countDownLatch;
        private final Admin hbaseAdmin;
        private final Path hdfsCoprocessorJar;
        private final String tableName;
        private final List<String> processedTables;
        private final List<String> failedTables;

        public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin admin, Path path, String str, List<String> list, List<String> list2) {
            this.countDownLatch = countDownLatch;
            this.hbaseAdmin = admin;
            this.hdfsCoprocessorJar = path;
            this.tableName = str;
            this.processedTables = list;
            this.failedTables = list2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (DeployCoprocessorCLI.resetCoprocessor(this.tableName, this.hbaseAdmin, this.hdfsCoprocessorJar)) {
                    this.processedTables.add(this.tableName);
                } else {
                    this.failedTables.add(this.tableName);
                }
            } catch (Exception e) {
                this.failedTables.add(this.tableName);
                DeployCoprocessorCLI.logger.error("Error processing " + this.tableName, (Throwable) e);
            } finally {
                this.countDownLatch.countDown();
            }
        }
    }

    public static void main(String[] strArr) throws IOException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        FileSystem fileSystem = FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration());
        Connection connection = HBaseConnection.get(instanceFromEnv.getStorageUrl());
        Admin admin = null;
        DeployCoprocessorCLIOps parseArgs = parseArgs(strArr);
        try {
            admin = connection.getAdmin();
            String coprocessorLocalJar = "default".equals(parseArgs.getLocalCoprocessorJar()) ? instanceFromEnv.getCoprocessorLocalJar() : new File(parseArgs.getLocalCoprocessorJar()).getAbsolutePath();
            logger.info("Identify coprocessor jar " + coprocessorLocalJar);
            logger.info("Use at most {} threads to do upgrade", Integer.valueOf(parseArgs.getMaxThreads()));
            List<String> hTableNames = getHTableNames(instanceFromEnv);
            logger.info("Identify tables " + hTableNames);
            String lowerCase = parseArgs.getFilterType().toLowerCase(Locale.ROOT);
            if (lowerCase.equals("-table")) {
                hTableNames = filterByTables(hTableNames, Arrays.asList(parseArgs.getEntities()));
            } else if (lowerCase.equals("-cube")) {
                hTableNames = filterByCubes(hTableNames, Arrays.asList(parseArgs.getEntities()));
            } else if (lowerCase.equals(SOSCmd.FLAG_PROJECT)) {
                hTableNames = filterByProjects(hTableNames, Arrays.asList(parseArgs.getEntities()));
            } else if (!lowerCase.equals("all")) {
                printUsageAndExit();
            }
            logger.info("Tables after filtering by type " + lowerCase + ": " + hTableNames);
            List<String> filterByGitCommit = filterByGitCommit(admin, hTableNames);
            logger.info("Will execute tables " + filterByGitCommit);
            long currentTimeMillis = System.currentTimeMillis();
            Set<String> coprocessorJarPaths = getCoprocessorJarPaths(admin, filterByGitCommit);
            logger.info("Old coprocessor jar: " + coprocessorJarPaths);
            Path uploadCoprocessorJar = uploadCoprocessorJar(coprocessorLocalJar, fileSystem, coprocessorJarPaths);
            logger.info("New coprocessor jar: " + uploadCoprocessorJar);
            Pair<List<String>, List<String>> resetCoprocessorOnHTables = resetCoprocessorOnHTables(admin, uploadCoprocessorJar, filterByGitCommit, parseArgs.getMaxThreads());
            logger.info("Processed time: " + (System.currentTimeMillis() - currentTimeMillis));
            logger.info("Processed tables count: " + resetCoprocessorOnHTables.getFirst().size());
            logger.info("Processed tables: " + resetCoprocessorOnHTables.getFirst());
            logger.error("Failed tables count: " + resetCoprocessorOnHTables.getSecond().size());
            logger.error("Failed tables : " + resetCoprocessorOnHTables.getSecond());
            logger.info("Active coprocessor jar: " + uploadCoprocessorJar);
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                admin.close();
            }
            throw th;
        }
    }

    @VisibleForTesting
    static DeployCoprocessorCLIOps parseArgs(String[] strArr) {
        String str;
        String[] strArr2;
        int i = 8;
        if (strArr == null || strArr.length <= 1) {
            printUsageAndExit();
            return null;
        }
        if (StringUtils.isNumeric(strArr[1])) {
            i = Integer.parseInt(strArr[1]);
            str = strArr[2];
            strArr2 = (String[]) Arrays.copyOfRange(strArr, 3, strArr.length);
        } else {
            str = strArr[1];
            strArr2 = (String[]) Arrays.copyOfRange(strArr, 2, strArr.length);
        }
        String str2 = strArr[0];
        if (StringUtils.isEmpty(str2) || StringUtils.isEmpty(str)) {
            printUsageAndExit();
        }
        return new DeployCoprocessorCLIOps(str2, i, str, strArr2);
    }

    private static void printUsageAndExit() {
        logger.info("Usage: ");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] all");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -table tableName1 tableName2 ...");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -cube cubeName1 cubeName2 ... ");
        logger.info("$KYLIN_HOME/bin/kylin.sh  org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -project projectName1 projectName2 ...");
        System.exit(0);
    }

    private static List<String> filterByGitCommit(Admin admin, List<String> list) throws IOException {
        LinkedList newLinkedList = Lists.newLinkedList();
        LinkedList newLinkedList2 = Lists.newLinkedList();
        String gitCommitInfo = KylinVersion.getGitCommitInfo();
        if (StringUtils.isEmpty(gitCommitInfo)) {
            return list;
        }
        logger.info("Commit Information: " + gitCommitInfo);
        int i = 0;
        for (String str : list) {
            if (!admin.isTableAvailable(TableName.valueOf(str))) {
                logger.warn("Table: " + str + " is not available currently, skip it");
                i++;
            } else if (gitCommitInfo.equals(admin.getTableDescriptor(TableName.valueOf(str)).getValue(IRealizationConstants.HTableGitTag))) {
                newLinkedList2.add(str);
            } else {
                newLinkedList.add(str);
            }
        }
        logger.info("Skip {} tables for not founding in HBase Cluster", Integer.valueOf(i));
        logger.info("Filtered tables don't need to deploy coprocessors: " + newLinkedList2);
        return newLinkedList;
    }

    private static List<String> filterByProjects(List<String> list, List<String> list2) {
        ProjectManager projectManager = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = list2.iterator();
        while (it.hasNext()) {
            String trim = it.next().trim();
            if (trim.endsWith(",")) {
                trim = trim.substring(0, trim.length() - 1);
            }
            Iterator<RealizationEntry> it2 = projectManager.getProject(trim).getRealizationEntries(RealizationType.CUBE).iterator();
            while (it2.hasNext()) {
                Iterator<T> it3 = cubeManager.getCube(it2.next().getRealization()).getSegments().iterator();
                while (it3.hasNext()) {
                    String storageLocationIdentifier = ((CubeSegment) it3.next()).getStorageLocationIdentifier();
                    if (list.contains(storageLocationIdentifier)) {
                        newArrayList.add(storageLocationIdentifier);
                    }
                }
            }
        }
        return newArrayList;
    }

    private static List<String> filterByCubes(List<String> list, List<String> list2) {
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = list2.iterator();
        while (it.hasNext()) {
            String trim = it.next().trim();
            if (trim.endsWith(",")) {
                trim = trim.substring(0, trim.length() - 1);
            }
            Iterator<T> it2 = cubeManager.getCube(trim).getSegments().iterator();
            while (it2.hasNext()) {
                String storageLocationIdentifier = ((CubeSegment) it2.next()).getStorageLocationIdentifier();
                if (list.contains(storageLocationIdentifier)) {
                    newArrayList.add(storageLocationIdentifier);
                }
            }
        }
        return newArrayList;
    }

    private static List<String> filterByTables(List<String> list, List<String> list2) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = list2.iterator();
        while (it.hasNext()) {
            String trim = it.next().trim();
            if (trim.endsWith(",")) {
                trim = trim.substring(0, trim.length() - 1);
            }
            if (list.contains(trim)) {
                newArrayList.add(trim);
            }
        }
        return newArrayList;
    }

    public static void deployCoprocessor(HTableDescriptor hTableDescriptor) {
        try {
            initHTableCoprocessor(hTableDescriptor);
            logger.info("hbase table " + hTableDescriptor.getTableName() + " deployed with coprocessor.");
        } catch (Exception e) {
            logger.error("Error deploying coprocessor on " + hTableDescriptor.getTableName(), (Throwable) e);
            logger.error("Will try creating the table without coprocessor.");
        }
    }

    private static void initHTableCoprocessor(HTableDescriptor hTableDescriptor) throws IOException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        addCoprocessorOnHTable(hTableDescriptor, uploadCoprocessorJar(instanceFromEnv.getCoprocessorLocalJar(), FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()), null));
    }

    public static void addCoprocessorOnHTable(HTableDescriptor hTableDescriptor, Path path) throws IOException {
        logger.info("Add coprocessor on " + hTableDescriptor.getNameAsString());
        hTableDescriptor.addCoprocessor(CubeEndpointClass, path, 1001, (Map) null);
    }

    public static boolean resetCoprocessor(String str, Admin admin, Path path) throws IOException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(str));
        if (!tableDescriptor.getValue(IRealizationConstants.HTableTag).equalsIgnoreCase(instanceFromEnv.getMetadataUrlPrefix())) {
            logger.warn("This server doesn't own this table: " + str);
            return false;
        }
        logger.info("reset coprocessor on " + str);
        logger.info("Disable " + str);
        if (admin.isTableEnabled(TableName.valueOf(str))) {
            admin.disableTable(TableName.valueOf(str));
        }
        while (tableDescriptor.hasCoprocessor(CubeObserverClassOld2)) {
            tableDescriptor.removeCoprocessor(CubeObserverClassOld2);
        }
        while (tableDescriptor.hasCoprocessor(CubeEndpointClass)) {
            tableDescriptor.removeCoprocessor(CubeEndpointClass);
        }
        while (tableDescriptor.hasCoprocessor(IIEndpointClass)) {
            tableDescriptor.removeCoprocessor(IIEndpointClass);
        }
        while (tableDescriptor.hasCoprocessor(CubeObserverClassOld)) {
            tableDescriptor.removeCoprocessor(CubeObserverClassOld);
        }
        while (tableDescriptor.hasCoprocessor(IIEndpointClassOld)) {
            tableDescriptor.removeCoprocessor(IIEndpointClassOld);
        }
        addCoprocessorOnHTable(tableDescriptor, path);
        String gitCommitInfo = KylinVersion.getGitCommitInfo();
        if (!StringUtils.isEmpty(gitCommitInfo)) {
            tableDescriptor.setValue(IRealizationConstants.HTableGitTag, gitCommitInfo);
        }
        admin.modifyTable(TableName.valueOf(str), tableDescriptor);
        logger.info("Enable " + str);
        admin.enableTable(TableName.valueOf(str));
        return true;
    }

    private static Pair<List<String>, List<String>> resetCoprocessorOnHTables(Admin admin, Path path, List<String> list, int i) throws IOException {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        int availableProcessors = Runtime.getRuntime().availableProcessors() * 2;
        if (availableProcessors > i) {
            availableProcessors = i;
        }
        logger.info("Use {} threads to do upgrade", Integer.valueOf(availableProcessors));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(availableProcessors);
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            newFixedThreadPool.execute(new ResetCoprocessorWorker(countDownLatch, admin, path, it.next(), synchronizedList, synchronizedList2));
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("reset coprocessor failed: ", (Throwable) e);
        }
        newFixedThreadPool.shutdown();
        return new Pair<>(synchronizedList, synchronizedList2);
    }

    public static Path getNewestCoprocessorJar(KylinConfig kylinConfig, FileSystem fileSystem) throws IOException {
        FileStatus fileStatus = null;
        for (FileStatus fileStatus2 : fileSystem.listStatus(getCoprocessorHDFSDir(fileSystem, kylinConfig))) {
            if (fileStatus2.getPath().toString().endsWith(ResourceUtils.JAR_FILE_EXTENSION)) {
                if (fileStatus == null) {
                    fileStatus = fileStatus2;
                } else if (fileStatus.getModificationTime() < fileStatus2.getModificationTime()) {
                    fileStatus = fileStatus2;
                }
            }
        }
        if (fileStatus == null) {
            return null;
        }
        Path makeQualified = fileStatus.getPath().makeQualified(fileSystem.getUri(), (Path) null);
        logger.info("The newest coprocessor is " + makeQualified.toString());
        return makeQualified;
    }

    public static synchronized Path uploadCoprocessorJar(String str, FileSystem fileSystem, Set<String> set) throws IOException {
        Path path = null;
        File file = new File(str);
        if (set == null) {
            set = new HashSet();
        }
        Path coprocessorHDFSDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
        FileStatus[] listStatus = fileSystem.listStatus(coprocessorHDFSDir);
        int length = listStatus.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            FileStatus fileStatus = listStatus[i];
            if (isSame(file, fileStatus)) {
                path = fileStatus.getPath();
                break;
            }
            String path2 = fileStatus.getPath().toString();
            if (path2.endsWith(ResourceUtils.JAR_FILE_EXTENSION)) {
                set.add(path2);
            }
            i++;
        }
        if (path == null) {
            HashSet hashSet = new HashSet();
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                hashSet.add(new Path(it.next()).getName());
            }
            String baseFileName = getBaseFileName(str);
            String str2 = null;
            int i2 = 0;
            while (str2 == null) {
                int i3 = i2;
                i2++;
                str2 = baseFileName + "-" + i3 + ResourceUtils.JAR_FILE_EXTENSION;
                if (hashSet.contains(str2)) {
                    str2 = null;
                }
            }
            path = new Path(coprocessorHDFSDir, str2);
            FileInputStream fileInputStream = null;
            OutputStream outputStream = null;
            try {
                fileInputStream = new FileInputStream(file);
                outputStream = fileSystem.create(path);
                IOUtils.copy(fileInputStream, outputStream);
                IOUtils.closeQuietly(fileInputStream);
                IOUtils.closeQuietly(outputStream);
                fileSystem.setTimes(path, file.lastModified(), -1L);
            } catch (Throwable th) {
                IOUtils.closeQuietly(fileInputStream);
                IOUtils.closeQuietly(outputStream);
                throw th;
            }
        }
        return path.makeQualified(fileSystem.getUri(), (Path) null);
    }

    private static boolean isSame(File file, FileStatus fileStatus) {
        return fileStatus.getLen() == file.length() && fileStatus.getModificationTime() == file.lastModified();
    }

    private static String getBaseFileName(String str) {
        String name = new File(str).getName();
        if (name.endsWith(ResourceUtils.JAR_FILE_EXTENSION)) {
            name = name.substring(0, name.length() - ResourceUtils.JAR_FILE_EXTENSION.length());
        }
        return name;
    }

    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig kylinConfig) throws IOException {
        Path path = new Path(HBaseConnection.makeQualifiedPathInHBaseCluster(kylinConfig.getHdfsWorkingDirectory()), "coprocessor");
        fileSystem.mkdirs(path);
        return path;
    }

    private static Set<String> getCoprocessorJarPaths(Admin admin, List<String> list) throws IOException {
        HashSet hashSet = new HashSet();
        for (String str : list) {
            try {
                for (Map.Entry entry : admin.getTableDescriptor(TableName.valueOf(str)).getValues().entrySet()) {
                    if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(((ImmutableBytesWritable) entry.getKey()).get())).matches()) {
                        Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(((ImmutableBytesWritable) entry.getValue()).get()));
                        if (matcher.matches()) {
                            String trim = matcher.group(1).trim();
                            if (StringUtils.isNotEmpty(trim)) {
                                hashSet.add(trim);
                            }
                        }
                    }
                }
            } catch (TableNotFoundException e) {
                logger.warn("Table not found " + str, e);
            }
        }
        return hashSet;
    }

    private static List<String> getHTableNames(KylinConfig kylinConfig) {
        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
        ArrayList arrayList = new ArrayList();
        for (CubeInstance cubeInstance : cubeManager.listAllCubes()) {
            if (cubeInstance.getStorageType() == 0 || cubeInstance.getStorageType() == 2) {
                Iterator<T> it = cubeInstance.getSegments(SegmentStatusEnum.READY).iterator();
                while (it.hasNext()) {
                    String storageLocationIdentifier = ((CubeSegment) it.next()).getStorageLocationIdentifier();
                    if (!StringUtils.isBlank(storageLocationIdentifier)) {
                        arrayList.add(storageLocationIdentifier);
                        System.out.println("added new table: " + storageLocationIdentifier);
                    }
                }
            }
        }
        return arrayList;
    }
}
