package org.apache.hudi.utilities.multitable;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hudi.org.apache.hadoop.hbase.util.Strings;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.class */
public class HoodieMultiTableServicesMain {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMultiTableServicesMain.class);
    final Config cfg;
    final TypedProperties props;
    private final JavaSparkContext jsc;
    private ScheduledExecutorService executorService;

    /* loaded from: input_file:org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
        public String sparkMaster;

        @Parameter(names = {"--base-path"}, description = "Base path for all the tables, this can be repeated", required = true, splitter = IdentitySplitter.class)
        public List<String> basePath = Collections.emptyList();

        @Parameter(names = {"--auto-discovery", "-a"}, description = "Whether to discover hudi tables in the base path")
        public boolean autoDiscovery = false;

        @Parameter(names = {"--parallelism"}, description = "Parallelism for hoodie table service")
        public int parallelism = 200;

        @Parameter(names = {"--batch", "-b"}, description = "Run services in batch or streaming mode")
        public boolean batch = false;

        @Parameter(names = {"--schedule-delay", "-d"}, description = "Table services schedule delay")
        public int scheduleDelay = RpcClient.FAILED_SERVER_EXPIRY_DEFAULT;

        @Parameter(names = {"--retry", "-r"}, description = "Table service retry count")
        public int retry = 1;

        @Parameter(names = {"--poolSize", "-p"}, description = "thread pool size")
        public int poolSize = Runtime.getRuntime().availableProcessors();

        @Parameter(names = {"--name", "-n"}, description = "Spark APP name")
        public String appName = "Hudi Table Service";

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--enable-compaction"}, help = true)
        public Boolean enableCompaction = false;

        @Parameter(names = {"--enable-clustering"}, help = true)
        public Boolean enableClustering = false;

        @Parameter(names = {"--enable-clean"}, help = true)
        public Boolean enableClean = false;

        @Parameter(names = {"--enable-archive"}, help = true)
        public Boolean enableArchive = false;

        @Parameter(names = {"--compaction-mode"}, description = "Set job mode: Set \"schedule\" means make a compact plan; Set \"execute\" means execute a compact plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a compact plan first and execute that plan immediately")
        public String compactionRunningMode = "execute";

        @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class")
        public String compactionStrategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();

        @Parameter(names = {"--clustering-mode"}, description = "Set job mode: Set \"schedule\" means make a clustering plan; Set \"execute\" means execute a clustering plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a clustering plan first and execute that plan immediately")
        public String clusteringRunningMode = "scheduleandexecute";

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use")
        public String sparkMemory = null;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for table service")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        public String toString() {
            return new StringJoiner(Strings.DEFAULT_KEYVALUE_SEPARATOR, Config.class.getSimpleName() + "[", "]").add("basePath=" + this.basePath).add("autoDiscovery=" + this.autoDiscovery).add("parallelism=" + this.parallelism).add("batch=" + this.batch).add("scheduleDelay=" + this.scheduleDelay).add("retry=" + this.retry).add("poolSize=" + this.poolSize).add("appName='" + this.appName + "'").add("help=" + this.help).add("enableCompaction=" + this.enableCompaction).add("enableClustering=" + this.enableClustering).add("enableClean=" + this.enableClean).add("enableArchive=" + this.enableArchive).add("compactionRunningMode='" + this.compactionRunningMode + "'").add("compactionStrategyClassName='" + this.compactionStrategyClassName + "'").add("clusteringRunningMode='" + this.clusteringRunningMode + "'").add("sparkMaster='" + this.sparkMaster + "'").add("sparkMemory='" + this.sparkMemory + "'").add("propsFilePath='" + this.propsFilePath + "'").add("configs=" + this.configs).toString();
        }
    }

    private void batchRunTableServices(List<String> list) throws InterruptedException, ExecutionException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.cfg.poolSize);
        List list2 = (List) list.stream().map(str -> {
            return CompletableFuture.runAsync(() -> {
                MultiTableServiceUtils.buildTableServicePipeline(this.jsc, str, this.cfg, this.props).execute();
            }, newFixedThreadPool);
        }).collect(Collectors.toList());
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) list2.toArray(new CompletableFuture[0]));
        CompletableFuture completableFuture = new CompletableFuture();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        }
        try {
            try {
                CompletableFuture.anyOf(allOf, completableFuture).get();
                newFixedThreadPool.shutdownNow();
            } catch (ExecutionException e) {
                throw new ExecutionException("some table service failed", e);
            }
        } catch (Throwable th2) {
            newFixedThreadPool.shutdownNow();
            throw th2;
        }
    }

    private void streamRunTableServices(List<String> list) throws InterruptedException {
        this.executorService = Executors.newScheduledThreadPool(this.cfg.poolSize);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            TableServicePipeline buildTableServicePipeline = MultiTableServiceUtils.buildTableServicePipeline(this.jsc, it.next(), this.cfg, this.props);
            ScheduledExecutorService scheduledExecutorService = this.executorService;
            buildTableServicePipeline.getClass();
            scheduledExecutorService.scheduleAtFixedRate(buildTableServicePipeline::execute, 0L, this.cfg.scheduleDelay, TimeUnit.MILLISECONDS);
        }
        this.executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
    }

    public void cancel() {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public HoodieMultiTableServicesMain(JavaSparkContext javaSparkContext, Config config) {
        this.cfg = config;
        this.jsc = javaSparkContext;
        this.props = config.propsFilePath == null ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
    }

    public void startServices() throws ExecutionException, InterruptedException {
        LOG.info("StartServices Config: " + this.cfg);
        List<String> tablesToBeServedFromProps = this.cfg.autoDiscovery ? (List) this.cfg.basePath.stream().filter(this::pathExists).flatMap(str -> {
            return MultiTableServiceUtils.findHoodieTablesUnderPath(this.jsc, str).stream();
        }).collect(Collectors.toList()) : MultiTableServiceUtils.getTablesToBeServedFromProps(this.jsc, this.props);
        LOG.info("All table paths: " + String.join(",", tablesToBeServedFromProps));
        if (this.cfg.batch) {
            batchRunTableServices(tablesToBeServedFromProps);
        } else {
            streamRunTableServices(tablesToBeServedFromProps);
        }
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(javaSparkContext.hadoopConfiguration(), new Path(config.propsFilePath), config.configs).getProps(true);
    }

    private boolean pathExists(String str) {
        try {
            Path path = new Path(str);
            return path.getFileSystem(this.jsc.hadoopConfiguration()).exists(path);
        } catch (IOException e) {
            throw new HoodieIOException("Error checking path existing:", e);
        }
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext(config.appName, config.sparkMaster, config.sparkMemory);
        try {
            try {
                new HoodieMultiTableServicesMain(buildSparkContext, config).startServices();
                buildSparkContext.stop();
            } catch (Throwable th) {
                LOG.error("Fail to run table services, ", th);
                buildSparkContext.stop();
            }
        } catch (Throwable th2) {
            buildSparkContext.stop();
            throw th2;
        }
    }
}
