package org.apache.hudi.integ.testsuite;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ResourceBundle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.class */
public class SparkDataSourceContinuousIngestTool {
    private static final Logger LOG = LoggerFactory.getLogger(SparkDataSourceContinuousIngestTool.class);
    private final Config cfg;
    private TypedProperties props;
    private HoodieSparkEngineContext context;
    private SparkSession sparkSession;

    /* loaded from: input_file:org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--source-path", "-sp"}, description = "Source path for the parquet data to consume", required = true)
        public String sourcePath = null;

        @Parameter(names = {"--source-format", "-sf"}, description = "source data format", required = false)
        public String sparkFormat = "parquet";

        @Parameter(names = {"--checkpoint-file-path", "-cpf"}, description = "Checkpoint file path to store/fetch checkpointing info", required = true)
        public String checkpointFilePath = null;

        @Parameter(names = {"--base-path", "-bp"}, description = "Base path for the hudi table", required = true)
        public String basePath = null;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
        public String sparkMemory = "1g";

        @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for table repair")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();
    }

    public SparkDataSourceContinuousIngestTool(JavaSparkContext javaSparkContext, Config config) {
        if (config.propsFilePath != null) {
            config.propsFilePath = HadoopFSUtils.addSchemeIfLocalPath(config.propsFilePath).toString();
        }
        this.context = new HoodieSparkEngineContext(javaSparkContext);
        this.sparkSession = SparkSession.builder().config(javaSparkContext.getConf()).getOrCreate();
        this.cfg = config;
        this.props = config.propsFilePath == null ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("spark-datasource-continuous-ingestion-tool", config.sparkMaster, config.sparkMemory);
        try {
            try {
                new SparkDataSourceContinuousIngestTool(buildSparkContext, config).run();
                buildSparkContext.stop();
            } catch (Throwable th) {
                LOG.error("Fail to run Continuous Ingestion for spark datasource " + config.basePath, th);
                buildSparkContext.stop();
            }
        } catch (Throwable th2) {
            buildSparkContext.stop();
            throw th2;
        }
    }

    public void run() {
        try {
            new SparkDataSourceContinuousIngest(this.sparkSession, (Configuration) this.context.getStorageConf().unwrapAs(Configuration.class), new Path(this.cfg.sourcePath), this.cfg.sparkFormat, new Path(this.cfg.checkpointFilePath), new Path(this.cfg.basePath), getPropsAsMap(this.props), this.cfg.minSyncIntervalSeconds.intValue()).startIngestion();
        } finally {
            this.sparkSession.stop();
            this.context.getJavaSparkContext().stop();
        }
    }

    private Map<String, String> getPropsAsMap(TypedProperties typedProperties) {
        HashMap hashMap = new HashMap();
        typedProperties.entrySet().forEach(entry -> {
        });
        return hashMap;
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(javaSparkContext.hadoopConfiguration(), new Path(config.propsFilePath), config.configs).getProps(true);
    }
}
