package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.utilities.multitable.MultiTableServiceUtils;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieIndexer.class */
public class HoodieIndexer {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIndexer.class);
    static final String DROP_INDEX = "dropindex";
    private final Config cfg;
    private TypedProperties props;
    private final JavaSparkContext jsc;
    private final HoodieTableMetaClient metaClient;

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieIndexer$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
        public String basePath = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
        public String indexInstantTime = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
        public int parallelism = 1;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
        public String sparkMemory = null;

        @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
        public int retry = 0;

        @Parameter(names = {"--index-types", "-ixt"}, description = "Comma-separated index types to be built, e.g. BLOOM_FILTERS,COLUMN_STATS", required = true)
        public String indexTypes = null;

        @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately;Set \"dropIndex\" to drop the index types specified in --index-types;")
        public String runningMode = null;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for indexing")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();
    }

    public HoodieIndexer(JavaSparkContext javaSparkContext, Config config) {
        this.cfg = config;
        this.jsc = javaSparkContext;
        this.props = StringUtils.isNullOrEmpty(config.propsFilePath) ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
        this.metaClient = UtilHelpers.createMetaClient(javaSparkContext, config.basePath, true);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(javaSparkContext.hadoopConfiguration(), new Path(config.propsFilePath), config.configs).getProps(true);
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("indexing-" + config.tableName, config.sparkMaster, config.sparkMemory);
        int start = new HoodieIndexer(buildSparkContext, config).start(config.retry);
        String format = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s", config.basePath, config.tableName, config.runningMode);
        if (start == -1) {
            LOG.error(format + " failed");
        } else {
            LOG.info(format + " success");
        }
        buildSparkContext.stop();
    }

    public int start(int i) {
        if (!this.props.getBoolean(HoodieMetadataConfig.ENABLE.key())) {
            LOG.error(String.format("Metadata is not enabled. Please set %s to true.", HoodieMetadataConfig.ENABLE.key()));
            return -1;
        }
        Set inflightAndCompletedMetadataPartitions = HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions(this.metaClient.getTableConfig());
        LOG.info("Setting props for: " + inflightAndCompletedMetadataPartitions);
        inflightAndCompletedMetadataPartitions.forEach(str -> {
            if ("column_stats".equals(str)) {
                this.props.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
            }
            if ("bloom_filters".equals(str)) {
                this.props.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true");
            }
            if ("record_index".equals(str)) {
                this.props.setProperty(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true");
            }
        });
        return UtilHelpers.retry(i, () -> {
            String lowerCase = this.cfg.runningMode.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1319569547:
                    if (lowerCase.equals("execute")) {
                        z = 2;
                        break;
                    }
                    break;
                case -697920873:
                    if (lowerCase.equals("schedule")) {
                        z = false;
                        break;
                    }
                    break;
                case -504432829:
                    if (lowerCase.equals(DROP_INDEX)) {
                        z = 3;
                        break;
                    }
                    break;
                case 1514639253:
                    if (lowerCase.equals("scheduleandexecute")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    LOG.info("Running Mode: [schedule]; Do schedule");
                    Option<String> scheduleIndexing = scheduleIndexing(this.jsc);
                    int i2 = scheduleIndexing.isPresent() ? 0 : -1;
                    if (i2 == 0) {
                        LOG.info("The schedule instant time is " + ((String) scheduleIndexing.get()));
                    }
                    return Integer.valueOf(i2);
                case true:
                    LOG.info("Running Mode: [scheduleandexecute]");
                    return Integer.valueOf(scheduleAndRunIndexing(this.jsc));
                case true:
                    LOG.info("Running Mode: [execute];");
                    return Integer.valueOf(runIndexing(this.jsc));
                case NoNewDataTerminationStrategy.DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN /* 3 */:
                    LOG.info("Running Mode: [dropindex];");
                    return Integer.valueOf(dropIndex(this.jsc));
                default:
                    LOG.info("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly");
                    return -1;
            }
        }, "Indexer failed");
    }

    public Option<String> doSchedule() throws Exception {
        return scheduleIndexing(this.jsc);
    }

    private Option<String> scheduleIndexing(JavaSparkContext javaSparkContext) throws Exception {
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            try {
                Option<String> doSchedule = doSchedule(createHoodieClient);
                if (createHoodieClient != null) {
                    if (0 != 0) {
                        try {
                            createHoodieClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createHoodieClient.close();
                    }
                }
                return doSchedule;
            } finally {
            }
        } catch (Throwable th3) {
            if (createHoodieClient != null) {
                if (th != null) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            throw th3;
        }
    }

    private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> sparkRDDWriteClient) {
        List<MetadataPartitionType> requestedPartitionTypes = getRequestedPartitionTypes(this.cfg.indexTypes, Option.of(getHoodieMetadataConfig()));
        ValidationUtils.checkArgument(requestedPartitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
        if (!isMetadataInitialized() && !requestedPartitionTypes.contains(MetadataPartitionType.FILES)) {
            throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(requestedPartitionTypes.toArray()));
        }
        if (indexExists(requestedPartitionTypes)) {
            return Option.empty();
        }
        Option<String> scheduleIndexing = sparkRDDWriteClient.scheduleIndexing(requestedPartitionTypes);
        if (!scheduleIndexing.isPresent()) {
            LOG.error("Scheduling of index action did not return any instant.");
        }
        return scheduleIndexing;
    }

    private HoodieMetadataConfig getHoodieMetadataConfig() {
        this.props.setProperty(HoodieWriteConfig.BASE_PATH.key(), this.cfg.basePath);
        return HoodieWriteConfig.newBuilder().withProps(this.props).build().getMetadataConfig();
    }

    private boolean indexExists(List<MetadataPartitionType> list) {
        Set metadataPartitions = this.metaClient.getTableConfig().getMetadataPartitions();
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getPartitionPath();
        }).collect(Collectors.toSet());
        set.retainAll(metadataPartitions);
        if (set.isEmpty()) {
            return false;
        }
        LOG.error("Following indexes already built: " + set);
        return true;
    }

    private boolean isMetadataInitialized() {
        return !this.metaClient.getTableConfig().getMetadataPartitions().isEmpty();
    }

    private int runIndexing(JavaSparkContext javaSparkContext) throws Exception {
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            if (StringUtils.isNullOrEmpty(this.cfg.indexInstantTime)) {
                Option firstInstant = this.metaClient.getActiveTimeline().filterPendingIndexTimeline().firstInstant();
                if (!firstInstant.isPresent()) {
                    throw new HoodieIndexException("There is no scheduled indexing in the table.");
                }
                this.cfg.indexInstantTime = ((HoodieInstant) firstInstant.get()).getTimestamp();
                LOG.info("Found the earliest scheduled indexing instant which will be executed: " + this.cfg.indexInstantTime);
            }
            return handleResponse(createHoodieClient.index(this.cfg.indexInstantTime)) ? 0 : 1;
        } finally {
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
        }
    }

    private int scheduleAndRunIndexing(JavaSparkContext javaSparkContext) throws Exception {
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            Option<String> doSchedule = doSchedule(createHoodieClient);
            if (doSchedule.isPresent()) {
                return handleResponse(createHoodieClient.index((String) doSchedule.get())) ? 0 : 1;
            }
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            return -1;
        } finally {
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
        }
    }

    private int dropIndex(JavaSparkContext javaSparkContext) throws Exception {
        List<MetadataPartitionType> requestedPartitionTypes = getRequestedPartitionTypes(this.cfg.indexTypes, Option.empty());
        try {
            SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
            Throwable th = null;
            try {
                try {
                    createHoodieClient.dropIndex(requestedPartitionTypes);
                    if (createHoodieClient != null) {
                        if (0 != 0) {
                            try {
                                createHoodieClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createHoodieClient.close();
                        }
                    }
                    return 0;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Failed to drop index. ", e);
            return -1;
        }
    }

    private boolean handleResponse(Option<HoodieIndexCommitMetadata> option) {
        if (!option.isPresent()) {
            LOG.error("Indexing failed as no commit metadata present.");
            return false;
        }
        List<HoodieIndexPartitionInfo> indexPartitionInfos = ((HoodieIndexCommitMetadata) option.get()).getIndexPartitionInfos();
        LOG.info(String.format("Indexing complete for partitions: %s", indexPartitionInfos.stream().map((v0) -> {
            return v0.getMetadataPartitionPath();
        }).collect(Collectors.toList())));
        return isIndexBuiltForAllRequestedTypes(indexPartitionInfos);
    }

    boolean isIndexBuiltForAllRequestedTypes(List<HoodieIndexPartitionInfo> list) {
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getMetadataPartitionPath();
        }).collect(Collectors.toSet());
        Set set2 = (Set) getRequestedPartitionTypes(this.cfg.indexTypes, Option.empty()).stream().map((v0) -> {
            return v0.getPartitionPath();
        }).collect(Collectors.toSet());
        set2.removeAll(set);
        return set2.isEmpty();
    }

    List<MetadataPartitionType> getRequestedPartitionTypes(String str, Option<HoodieMetadataConfig> option) {
        return (List) Arrays.asList(str.split(MultiTableServiceUtils.Constants.COMMA_SEPARATOR)).stream().map(str2 -> {
            return MetadataPartitionType.valueOf(str2.toUpperCase(Locale.ROOT));
        }).collect(Collectors.toList());
    }
}
