package org.apache.hudi.utilities;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import org.apache.hudi.utilities.HoodieMetadataTableValidator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieDataTableValidator.class */
public class HoodieDataTableValidator implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieDataTableValidator.class);
    private transient JavaSparkContext jsc;
    private Config cfg;
    private TypedProperties props;
    private HoodieTableMetaClient metaClient;
    protected transient Option<AsyncDataTableValidateService> asyncDataTableValidateService;

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieDataTableValidator$AsyncDataTableValidateService.class */
    public class AsyncDataTableValidateService extends HoodieAsyncService {
        private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

        public AsyncDataTableValidateService() {
        }

        @Override // org.apache.hudi.async.HoodieAsyncService
        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (true) {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        HoodieDataTableValidator.this.doDataTableValidation();
                        long intValue = (HoodieDataTableValidator.this.cfg.minValidateIntervalSeconds.intValue() * 1000) - (System.currentTimeMillis() - currentTimeMillis);
                        if (intValue > 0) {
                            HoodieDataTableValidator.LOG.info("Last validate ran less than min validate interval: " + HoodieDataTableValidator.this.cfg.minValidateIntervalSeconds + " s, sleep: " + intValue + " ms.");
                            Thread.sleep(intValue);
                        }
                    } catch (InterruptedException e) {
                    } catch (HoodieValidationException e2) {
                        HoodieDataTableValidator.LOG.error("Shutting down AsyncDataTableValidateService due to HoodieValidationException", e2);
                        if (!HoodieDataTableValidator.this.cfg.ignoreFailed) {
                            throw e2;
                        }
                    }
                }
            }, this.executor), this.executor);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieDataTableValidator$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
        public String basePath = null;

        @Parameter(names = {"--continuous"}, description = "Running MetadataTableValidator in continuous. Can use --min-validate-interval-seconds to control validation frequency", required = false)
        public boolean continuous = false;

        @Parameter(names = {"--min-validate-interval-seconds"}, description = "the min validate interval of each validate when set --continuous, default is 10 minutes.")
        public Integer minValidateIntervalSeconds = 600;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for validation", required = false)
        public int parallelism = 200;

        @Parameter(names = {"--ignore-failed", "-ig"}, description = "Ignore data table validate failure and continue.", required = false)
        public boolean ignoreFailed = false;

        @Parameter(names = {"--assume-date-partitioning"}, description = "Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path.This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually", required = false)
        public Boolean assumeDatePartitioning = false;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
        public String sparkMemory = "1g";

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        public String toString() {
            return "MetadataTableValidatorConfig {\n   --base-path " + this.basePath + ", \n   --continuous " + this.continuous + ", \n   --ignore-failed " + this.ignoreFailed + ", \n   --min-validate-interval-seconds " + this.minValidateIntervalSeconds + ", \n   --parallelism " + this.parallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --assumeDatePartitioning-memory " + this.assumeDatePartitioning + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            HoodieMetadataTableValidator.Config config = (HoodieMetadataTableValidator.Config) obj;
            return this.basePath.equals(config.basePath) && Objects.equals(Boolean.valueOf(this.continuous), Boolean.valueOf(config.continuous)) && Objects.equals(this.minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(Integer.valueOf(this.parallelism), Integer.valueOf(config.parallelism)) && Objects.equals(Boolean.valueOf(this.ignoreFailed), Boolean.valueOf(config.ignoreFailed)) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.assumeDatePartitioning, config.assumeDatePartitioning) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, Boolean.valueOf(this.continuous), this.minValidateIntervalSeconds, Integer.valueOf(this.parallelism), Boolean.valueOf(this.ignoreFailed), this.sparkMaster, this.sparkMemory, this.assumeDatePartitioning, this.propsFilePath, this.configs, this.help);
        }
    }

    public HoodieDataTableValidator(HoodieTableMetaClient hoodieTableMetaClient) {
        this.metaClient = hoodieTableMetaClient;
    }

    public HoodieDataTableValidator(JavaSparkContext javaSparkContext, Config config) {
        this.jsc = javaSparkContext;
        this.cfg = config;
        this.props = config.propsFilePath == null ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
        this.metaClient = HoodieTableMetaClient.builder().setConf(javaSparkContext.hadoopConfiguration()).setBasePath(config.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.asyncDataTableValidateService = config.continuous ? Option.of(new AsyncDataTableValidateService()) : Option.empty();
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(javaSparkContext.hadoopConfiguration(), new Path(config.propsFilePath), config.configs).getProps(true);
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        SparkConf buildSparkConf = UtilHelpers.buildSparkConf("Hoodie-Data-Table-Validator", config.sparkMaster);
        buildSparkConf.set("spark.executor.memory", config.sparkMemory);
        JavaSparkContext javaSparkContext = new JavaSparkContext(buildSparkConf);
        HoodieDataTableValidator hoodieDataTableValidator = new HoodieDataTableValidator(javaSparkContext, config);
        try {
            try {
                hoodieDataTableValidator.run();
                javaSparkContext.stop();
            } catch (Throwable th) {
                LOG.error("Fail to do hoodie Data table validation for " + hoodieDataTableValidator.cfg, th);
                javaSparkContext.stop();
            }
        } catch (Throwable th2) {
            javaSparkContext.stop();
            throw th2;
        }
    }

    public void run() {
        try {
            try {
                LOG.info(this.cfg.toString());
                if (this.cfg.continuous) {
                    LOG.info(" ****** do hoodie data table validation in CONTINUOUS mode ******");
                    doHoodieDataTableValidationContinuous();
                } else {
                    LOG.info(" ****** do hoodie data table validation once ******");
                    doHoodieDataTableValidationOnce();
                }
            } catch (Exception e) {
                throw new HoodieException("Unable to do hoodie data table validation in " + this.cfg.basePath, e);
            }
        } finally {
            if (this.asyncDataTableValidateService.isPresent()) {
                this.asyncDataTableValidateService.get().shutdown(true);
            }
        }
    }

    private void doHoodieDataTableValidationOnce() {
        try {
            doDataTableValidation();
        } catch (HoodieValidationException e) {
            LOG.error("Metadata table validation failed to HoodieValidationException", e);
            if (!this.cfg.ignoreFailed) {
                throw e;
            }
        }
    }

    private void doHoodieDataTableValidationContinuous() {
        this.asyncDataTableValidateService.ifPresent(asyncDataTableValidateService -> {
            asyncDataTableValidateService.start(null);
            try {
                asyncDataTableValidateService.waitForShutdown();
            } catch (Exception e) {
                throw new HoodieException(e.getMessage(), e);
            }
        });
    }

    public void doDataTableValidation() {
        boolean z = true;
        this.metaClient.reloadActiveTimeline();
        this.metaClient.getBasePath();
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        try {
            List<Path> baseAndLogFilePathsFromFileSystem = HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(new FileSystemBackedTableMetadata(hoodieSparkEngineContext, this.metaClient.getTableConfig(), hoodieSparkEngineContext.getHadoopConf(), this.cfg.basePath, this.cfg.assumeDatePartitioning.booleanValue()), this.cfg.basePath);
            if (this.metaClient.getActiveTimeline().firstInstant().isPresent()) {
                String timestamp = this.metaClient.getActiveTimeline().firstInstant().get().getTimestamp();
                List list = (List) baseAndLogFilePathsFromFileSystem.stream().filter(path -> {
                    return HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(path.getName()), HoodieTimeline.LESSER_THAN, timestamp);
                }).collect(Collectors.toList());
                if (!list.isEmpty() && list.size() > 0) {
                    LOG.error("Data table validation failed due to dangling files count " + list.size() + ", found before active timeline");
                    list.forEach(path2 -> {
                        LOG.error("Dangling file: " + path2.toString());
                    });
                    z = false;
                    if (!this.cfg.ignoreFailed) {
                        throw new HoodieValidationException("Data table validation failed due to dangling files " + list.size());
                    }
                }
                Map<String, List<String>> tagInstantsOfBaseAndLogFiles = RepairUtils.tagInstantsOfBaseAndLogFiles(this.metaClient.getBasePath(), baseAndLogFilePathsFromFileSystem);
                HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
                List<HoodieInstant> instants = activeTimeline.filterCompletedInstants().getInstants();
                List list2 = (List) hoodieSparkEngineContext.flatMap(instants, hoodieInstant -> {
                    Option<Set<String>> baseAndLogFilePathsFromTimeline = RepairUtils.getBaseAndLogFilePathsFromTimeline(activeTimeline, hoodieInstant);
                    List emptyList = tagInstantsOfBaseAndLogFiles.containsKey(hoodieInstant.getTimestamp()) ? (List) tagInstantsOfBaseAndLogFiles.get(hoodieInstant.getTimestamp()) : Collections.emptyList();
                    if (emptyList.isEmpty()) {
                        return Stream.empty();
                    }
                    HashSet hashSet = new HashSet(emptyList);
                    if (baseAndLogFilePathsFromTimeline.isPresent()) {
                        hashSet.removeAll(baseAndLogFilePathsFromTimeline.get());
                    }
                    return new ArrayList(hashSet).stream();
                }, instants.size()).stream().collect(Collectors.toList());
                if (!list2.isEmpty()) {
                    LOG.error("Data table validation failed due to extra files found for completed commits " + list2.size());
                    list2.forEach(str -> {
                        LOG.error("Dangling file: " + str.toString());
                    });
                    z = false;
                    if (!this.cfg.ignoreFailed) {
                        throw new HoodieValidationException("Data table validation failed due to dangling files " + list2.size());
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Data table validation failed due to " + e.getMessage(), e);
            if (!this.cfg.ignoreFailed) {
                throw new HoodieValidationException("Data table validation failed due to " + e.getMessage(), e);
            }
        }
        if (z) {
            LOG.info("Data table validation succeeded.");
        } else {
            LOG.warn("Data table validation failed.");
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -564671240:
                if (implMethodName.equals("lambda$doDataTableValidation$17427ce1$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieDataTableValidator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/table/timeline/HoodieActiveTimeline;Ljava/util/Map;Lorg/apache/hudi/common/table/timeline/HoodieInstant;)Ljava/util/stream/Stream;")) {
                    HoodieActiveTimeline hoodieActiveTimeline = (HoodieActiveTimeline) serializedLambda.getCapturedArg(0);
                    Map map = (Map) serializedLambda.getCapturedArg(1);
                    return hoodieInstant -> {
                        Option<Set<String>> baseAndLogFilePathsFromTimeline = RepairUtils.getBaseAndLogFilePathsFromTimeline(hoodieActiveTimeline, hoodieInstant);
                        List emptyList = map.containsKey(hoodieInstant.getTimestamp()) ? (List) map.get(hoodieInstant.getTimestamp()) : Collections.emptyList();
                        if (emptyList.isEmpty()) {
                            return Stream.empty();
                        }
                        HashSet hashSet = new HashSet(emptyList);
                        if (baseAndLogFilePathsFromTimeline.isPresent()) {
                            hashSet.removeAll(baseAndLogFilePathsFromTimeline.get());
                        }
                        return new ArrayList(hashSet).stream();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
