package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.class */
public class HoodieMultiTableDeltaStreamer {
    private static Logger logger = LogManager.getLogger(HoodieMultiTableDeltaStreamer.class);
    private transient JavaSparkContext jssc;
    private List<TableExecutionContext> tableExecutionContexts = new ArrayList();
    private Set<String> successTables = new HashSet();
    private Set<String> failedTables = new HashSet();

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path-prefix"}, description = "base path prefix for multi table support via HoodieMultiTableDeltaStreamer class")
        public String basePathPrefix;

        @Parameter(names = {"--target-table"}, description = "name of the target table", required = true)
        public String targetTableName;

        @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
        public String tableType;

        @Parameter(names = {"--config-folder"}, description = "Path to folder which contains all the properties file", required = true)
        public String configFolder;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties.")
        public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--source-class"}, description = "Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
        public String sourceClassName = JsonDFSSource.class.getName();

        @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";

        @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();

        @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider. For Sources that return Dataset<Row>, the schema is obtained implicitly. However, this CLI option allows overriding the schemaprovider returned by Source.")
        public String schemaProviderClassName = null;

        @Parameter(names = {"--transformer-class"}, description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query templated to be passed as a transformation function). Pass a comma-separated list of subclass names to chain the transformations.")
        public List<String> transformerClassNames = null;

        @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. Default: No limit, e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
        public long sourceLimit = Long.MAX_VALUE;

        @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter = OperationConverter.class)
        public WriteOperationType operation = WriteOperationType.UPSERT;

        @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;

        @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
        public Boolean enableHiveSync = false;

        @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
        public Boolean enableMetaSync = false;

        @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unlessoutstanding compactions is less than this number")
        public Integer maxPendingCompactions = 5;

        @Parameter(names = {"--max-pending-clustering"}, description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unlessoutstanding clustering is less than this number")
        public Integer maxPendingClustering = 5;

        @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop")
        public Boolean continuousMode = false;

        @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;

        @Parameter(names = {"--spark-master"}, description = "spark master to use.")
        public String sparkMaster = "local[2]";

        @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;

        @Parameter(names = {"--delta-sync-scheduling-weight"}, description = "Scheduling weight for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingWeight = 1;

        @Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingWeight = 1;

        @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingMinShare = 0;

        @Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingMinShare = 0;

        @Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default. This flag disables it ")
        public Boolean forceDisableCompaction = false;

        @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
        public String checkpoint = null;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer$Constants.class */
    public static class Constants {
        public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
        private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
        private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
        public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
        private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
        private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
        private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
        private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
        private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
        private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
        private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
        private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
        private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
        private static final String LOCAL_SPARK_MASTER = "local[2]";
        private static final String FILE_DELIMITER = "/";
        private static final String DELIMITER = ".";
        private static final String UNDERSCORE = "_";
        private static final String COMMA_SEPARATOR = ",";
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer$Helpers.class */
    public static class Helpers {
        static String getDefaultConfigFilePath(String str, String str2, String str3) {
            return str + "/" + str2 + "_" + str3 + "_config.properties";
        }

        static String getTableWithDatabase(TableExecutionContext tableExecutionContext) {
            return tableExecutionContext.getDatabase() + "." + tableExecutionContext.getTableName();
        }

        static void deepCopyConfigs(Config config, HoodieDeltaStreamer.Config config2) {
            config2.enableHiveSync = config.enableHiveSync;
            config2.enableMetaSync = config.enableMetaSync;
            config2.schemaProviderClassName = config.schemaProviderClassName;
            config2.sourceOrderingField = config.sourceOrderingField;
            config2.sourceClassName = config.sourceClassName;
            config2.tableType = config.tableType;
            config2.targetTableName = config.targetTableName;
            config2.operation = config.operation;
            config2.sourceLimit = config.sourceLimit;
            config2.checkpoint = config.checkpoint;
            config2.continuousMode = config.continuousMode;
            config2.filterDupes = config.filterDupes;
            config2.payloadClassName = config.payloadClassName;
            config2.forceDisableCompaction = config.forceDisableCompaction;
            config2.maxPendingCompactions = config.maxPendingCompactions;
            config2.maxPendingClustering = config.maxPendingClustering;
            config2.minSyncIntervalSeconds = config.minSyncIntervalSeconds;
            config2.transformerClassNames = config.transformerClassNames;
            config2.commitOnErrors = config.commitOnErrors;
            config2.compactSchedulingMinShare = config.compactSchedulingMinShare;
            config2.compactSchedulingWeight = config.compactSchedulingWeight;
            config2.deltaSyncSchedulingMinShare = config.deltaSyncSchedulingMinShare;
            config2.deltaSyncSchedulingWeight = config.deltaSyncSchedulingWeight;
            config2.sparkMaster = config.sparkMaster;
        }
    }

    public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext javaSparkContext) throws IOException {
        this.jssc = javaSparkContext;
        String str = config.propsFilePath;
        String str2 = config.configFolder;
        ValidationUtils.checkArgument((config.filterDupes.booleanValue() && config.operation == WriteOperationType.UPSERT) ? false : true, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
        FileSystem fs = FSUtils.getFs(str, javaSparkContext.hadoopConfiguration());
        String substring = str2.charAt(str2.length() - 1) == '/' ? str2.substring(0, str2.length() - 1) : str2;
        checkIfPropsFileAndConfigFolderExist(str, substring, fs);
        populateTableExecutionContextList(UtilHelpers.readConfig(fs.getConf(), new Path(str), new ArrayList()).getProps(), substring, fs, config);
    }

    private void checkIfPropsFileAndConfigFolderExist(String str, String str2, FileSystem fileSystem) throws IOException {
        if (!fileSystem.exists(new Path(str))) {
            throw new IllegalArgumentException("Please provide valid common config file path!");
        }
        if (fileSystem.exists(new Path(str2))) {
            return;
        }
        fileSystem.mkdirs(new Path(str2));
    }

    private void checkIfTableConfigFileExists(String str, FileSystem fileSystem, String str2) throws IOException {
        if (!fileSystem.exists(new Path(str2)) || !fileSystem.isFile(new Path(str2))) {
            throw new IllegalArgumentException("Please provide valid table config file path!");
        }
        Path path = new Path(str2);
        Path path2 = new Path(str, path.getName());
        if (fileSystem.exists(path2)) {
            return;
        }
        FileUtil.copy(fileSystem, path, fileSystem, path2, false, fileSystem.getConf());
    }

    private void populateTableExecutionContextList(TypedProperties typedProperties, String str, FileSystem fileSystem, Config config) throws IOException {
        List<String> tablesToBeIngested = getTablesToBeIngested(typedProperties);
        logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
        for (String str2 : tablesToBeIngested) {
            String[] split = str2.split("\\.");
            String str3 = split.length > 1 ? split[0] : "default";
            String str4 = split.length > 1 ? split[1] : str2;
            String string = typedProperties.getString("hoodie.deltastreamer.ingestion." + str3 + "." + str4 + ".configFile", Helpers.getDefaultConfigFilePath(str, str3, str4));
            checkIfTableConfigFileExists(str, fileSystem, string);
            TypedProperties props = UtilHelpers.readConfig(fileSystem.getConf(), new Path(string), new ArrayList()).getProps();
            typedProperties.forEach((obj, obj2) -> {
                if (props.get(obj) == null) {
                    props.setProperty(obj.toString(), obj2.toString());
                }
            });
            HoodieDeltaStreamer.Config config2 = new HoodieDeltaStreamer.Config();
            String resetTarget = resetTarget(config, str3, str4);
            Helpers.deepCopyConfigs(config, config2);
            String string2 = props.getString("hoodie.deltastreamer.ingestion.targetBasePath", "");
            config2.targetBasePath = StringUtils.isNullOrEmpty(string2) ? resetTarget : string2;
            if (config2.enableMetaSync.booleanValue() && StringUtils.isNullOrEmpty(props.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
                throw new HoodieException("Meta sync table field not provided!");
            }
            populateSchemaProviderProps(config2, props);
            TableExecutionContext tableExecutionContext = new TableExecutionContext();
            tableExecutionContext.setProperties(props);
            tableExecutionContext.setConfig(config2);
            tableExecutionContext.setDatabase(str3);
            tableExecutionContext.setTableName(str4);
            this.tableExecutionContexts.add(tableExecutionContext);
        }
    }

    private List<String> getTablesToBeIngested(TypedProperties typedProperties) {
        String string = typedProperties.getString("hoodie.deltastreamer.ingestion.tablesToBeIngested");
        return string == null ? new ArrayList() : Arrays.asList(string.split(","));
    }

    private void populateSchemaProviderProps(HoodieDeltaStreamer.Config config, TypedProperties typedProperties) {
        String str;
        String str2;
        if (Objects.equals(config.schemaProviderClassName, SchemaRegistryProvider.class.getName())) {
            String string = typedProperties.getString("hoodie.deltastreamer.schemaprovider.registry.baseUrl");
            String string2 = typedProperties.getString("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", null);
            if (StringUtils.isNullOrEmpty(string2)) {
                str2 = typedProperties.getString("hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix");
                str = typedProperties.getString("hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix");
            } else {
                str = string2;
                str2 = string2;
            }
            typedProperties.setProperty(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP, string + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + str2);
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.registry.targetUrl", string + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + str);
        }
    }

    public static void main(String[] strArr) throws IOException {
        Config config = new Config();
        if (config.enableHiveSync.booleanValue()) {
            logger.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
        }
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("multi-table-delta-streamer", "local[2]");
        try {
            new HoodieMultiTableDeltaStreamer(config, buildSparkContext).sync();
            buildSparkContext.stop();
        } catch (Throwable th) {
            buildSparkContext.stop();
            throw th;
        }
    }

    private static String resetTarget(Config config, String str, String str2) {
        String str3 = config.basePathPrefix;
        String str4 = (str3.charAt(str3.length() - 1) == '/' ? str3.substring(0, str3.length() - 1) : str3) + "/" + str + "/" + str2;
        config.targetTableName = str + "." + str2;
        return str4;
    }

    public void sync() {
        for (TableExecutionContext tableExecutionContext : this.tableExecutionContexts) {
            try {
                new HoodieDeltaStreamer(tableExecutionContext.getConfig(), this.jssc, Option.ofNullable(tableExecutionContext.getProperties())).sync();
                this.successTables.add(Helpers.getTableWithDatabase(tableExecutionContext));
            } catch (Exception e) {
                logger.error("error while running MultiTableDeltaStreamer for table: " + tableExecutionContext.getTableName(), e);
                this.failedTables.add(Helpers.getTableWithDatabase(tableExecutionContext));
            }
        }
        logger.info("Ingestion was successful for topics: " + this.successTables);
        if (this.failedTables.isEmpty()) {
            return;
        }
        logger.info("Ingestion failed for topics: " + this.failedTables);
    }

    public Set<String> getSuccessTables() {
        return this.successTables;
    }

    public Set<String> getFailedTables() {
        return this.failedTables;
    }

    public List<TableExecutionContext> getTableExecutionContexts() {
        return this.tableExecutionContexts;
    }
}
