package org.apache.hudi.integ.testsuite;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.dag.scheduler.SaferSchemaDagScheduler;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.integ.testsuite.helpers.ZookeeperServiceProvider;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.class */
public class HoodieTestSuiteJob {
    private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
    private final HoodieTestSuiteConfig cfg;
    TypedProperties props;
    private transient FileSystem fs;
    private transient JavaSparkContext jsc;
    private transient SparkSession sparkSession;
    private transient HiveConf hiveConf;
    private BuiltinKeyGenerator keyGenerator;
    private transient HoodieTableMetaClient metaClient;

    /* loaded from: input_file:org/apache/hudi/integ/testsuite/HoodieTestSuiteJob$HoodieTestSuiteConfig.class */
    public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {

        @Parameter(names = {"--input-base-path"}, description = "base path for input data(Will be created if did not exist first time around. If exists, more data will be added to that path)", required = true)
        public String inputBasePath;

        @Parameter(names = {"--workload-yaml-path"}, description = "Workflow Dag yaml path to generate the workload")
        public String workloadYamlPath;

        @Parameter(names = {"--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload")
        public String workloadDagGenerator = WorkflowDagGenerator.class.getName();

        @Parameter(names = {"--delta-output-type"}, description = "Subclass of org.apache.hudi.testsuite.workload.DeltaOutputMode to readAvro data.")
        public String outputTypeName = DeltaOutputMode.DFS.name();

        @Parameter(names = {"--delta-input-format"}, description = "Subclass of org.apache.hudi.testsuite.workload.DeltaOutputMode to read avro data.")
        public String inputFormatName = DeltaInputType.AVRO.name();

        @Parameter(names = {"--input-file-size"}, description = "The min/max size of the input files to generate", required = true)
        public Long limitFileSize = 125829120L;

        @Parameter(names = {"--input-parallelism"}, description = "Parallelism to use when generation input files", required = false)
        public Integer inputParallelism = 0;

        @Parameter(names = {"--delete-old-input"}, description = "Delete older input files once they have been ingested", required = false)
        public Boolean deleteOldInput = false;

        @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to perform ingestion. If set to false, HoodieWriteClient will be used")
        public Boolean useDeltaStreamer = false;

        @Parameter(names = {"--clean-input"}, description = "Clean the input folders and delete all files within it before starting the Job")
        public Boolean cleanInput = false;

        @Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it before starting the Job")
        public Boolean cleanOutput = false;

        @Parameter(names = {"--saferSchemaEvolution"}, description = "Optimize the DAG for safer schema evolution.(If not provided, assumed to be false.)", required = false)
        public Boolean saferSchemaEvolution = false;

        @Parameter(names = {"--start-zookeeper"}, description = "Start Zookeeper instance to use for optimistic lock ")
        public Boolean startZookeeper = false;

        @Parameter(names = {"--start-hive-metastore"}, description = "Start Hive Metastore to use for optimistic lock ")
        public Boolean startHiveMetastore = false;
    }

    public HoodieTestSuiteJob(HoodieTestSuiteConfig hoodieTestSuiteConfig, JavaSparkContext javaSparkContext) throws IOException {
        log.warn("Running spark job w/ app id " + javaSparkContext.sc().applicationId());
        this.cfg = hoodieTestSuiteConfig;
        this.jsc = javaSparkContext;
        hoodieTestSuiteConfig.propsFilePath = FSUtils.addSchemeIfLocalPath(hoodieTestSuiteConfig.propsFilePath).toString();
        this.sparkSession = SparkSession.builder().config(javaSparkContext.getConf()).getOrCreate();
        this.fs = FSUtils.getFs(hoodieTestSuiteConfig.inputBasePath, javaSparkContext.hadoopConfiguration());
        this.props = UtilHelpers.readConfig(this.fs, new Path(hoodieTestSuiteConfig.propsFilePath), hoodieTestSuiteConfig.configs).getConfig();
        log.info("Creating workload generator with configs : {}", this.props.toString());
        this.hiveConf = getDefaultHiveConf(javaSparkContext.hadoopConfiguration());
        this.keyGenerator = DataSourceUtils.createKeyGenerator(this.props);
        this.metaClient = HoodieTableMetaClient.withPropertyBuilder().setTableType(hoodieTestSuiteConfig.tableType).setTableName(hoodieTestSuiteConfig.targetTableName).setArchiveLogFolder("archived").initTable(javaSparkContext.hadoopConfiguration(), hoodieTestSuiteConfig.targetBasePath);
        if (hoodieTestSuiteConfig.cleanInput.booleanValue()) {
            Path path = new Path(hoodieTestSuiteConfig.inputBasePath);
            if (this.fs.exists(path)) {
                this.fs.delete(path, true);
            }
        }
        if (hoodieTestSuiteConfig.cleanOutput.booleanValue()) {
            Path path2 = new Path(hoodieTestSuiteConfig.targetBasePath);
            if (this.fs.exists(path2)) {
                this.fs.delete(path2, true);
            }
        }
    }

    int getSchemaVersionFromCommit(int i) throws Exception {
        int i2 = 0;
        try {
            HoodieTimeline commitsTimeline = new HoodieActiveTimeline(this.metaClient).getCommitsTimeline();
            i2 = Integer.parseInt(new Schema.Parser().parse((String) ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) commitsTimeline.getInstantDetails((HoodieInstant) commitsTimeline.nthFromLastInstant(i).get()).get(), HoodieCommitMetadata.class)).getExtraMetadata().get("schema")).getObjectProp("schemaVersion").toString());
            log.info(String.format("Last used schemaVersion from latest commit file was %d. Optimizing the DAG.", Integer.valueOf(i2)));
        } catch (Exception e) {
            log.info("Last used schemaVersion could not be validated from commit file.  Skipping SaferSchema Optimization.");
        }
        return i2;
    }

    private static HiveConf getDefaultHiveConf(Configuration configuration) {
        HiveConf hiveConf = new HiveConf();
        hiveConf.addResource(configuration);
        return hiveConf;
    }

    public static void main(String[] strArr) throws Exception {
        HoodieTestSuiteConfig hoodieTestSuiteConfig = new HoodieTestSuiteConfig();
        JCommander jCommander = new JCommander(hoodieTestSuiteConfig, strArr);
        if (hoodieTestSuiteConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        new HoodieTestSuiteJob(hoodieTestSuiteConfig, UtilHelpers.buildSparkContext("workload-generator-" + hoodieTestSuiteConfig.outputTypeName + "-" + hoodieTestSuiteConfig.inputFormatName, hoodieTestSuiteConfig.sparkMaster)).runTestSuite();
    }

    public WorkflowDag createWorkflowDag() throws IOException {
        return this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils.loadClass(this.cfg.workloadDagGenerator)).build() : DagUtils.convertYamlPathToDag(FSUtils.getFs(this.cfg.workloadYamlPath, this.jsc.hadoopConfiguration(), true), this.cfg.workloadYamlPath);
    }

    public void runTestSuite() {
        try {
            try {
                WorkflowDag createWorkflowDag = createWorkflowDag();
                log.info("Workflow Dag => " + DagUtils.convertDagToYaml(createWorkflowDag));
                long currentTimeMillis = System.currentTimeMillis();
                WriterContext writerContext = new WriterContext(this.jsc, this.props, this.cfg, this.keyGenerator, this.sparkSession);
                writerContext.initContext(this.jsc);
                startOtherServicesIfNeeded(writerContext);
                if (this.cfg.saferSchemaEvolution.booleanValue()) {
                    int i = 2;
                    List nodeList = createWorkflowDag.getNodeList();
                    if (!nodeList.isEmpty() && (nodeList.get(0) instanceof RollbackNode)) {
                        i = ((DagNode) nodeList.get(0)).getConfig().getNumRollbacks();
                    }
                    new SaferSchemaDagScheduler(createWorkflowDag, writerContext, this.jsc, getSchemaVersionFromCommit(i - 1)).schedule();
                } else {
                    new DagScheduler(createWorkflowDag, writerContext, this.jsc).schedule();
                }
                log.info("Finished scheduling all tasks, Time taken {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                stopQuietly();
            } catch (Exception e) {
                log.error("Failed to run Test Suite ", e);
                throw new HoodieException("Failed to run Test Suite ", e);
            }
        } catch (Throwable th) {
            stopQuietly();
            throw th;
        }
    }

    private void stopQuietly() {
        try {
            this.sparkSession.stop();
            this.jsc.stop();
        } catch (Exception e) {
            log.error("Unable to stop spark session", e);
        }
    }

    private void startOtherServicesIfNeeded(WriterContext writerContext) throws Exception {
        if (this.cfg.startHiveMetastore.booleanValue()) {
            HiveServiceProvider hiveServiceProvider = new HiveServiceProvider(DeltaConfig.Config.newBuilder().withHiveLocal(true).build());
            hiveServiceProvider.startLocalHiveServiceIfNeeded(writerContext.getHoodieTestSuiteWriter().getConfiguration());
            hiveServiceProvider.syncToLocalHiveIfNeeded(writerContext.getHoodieTestSuiteWriter());
        }
        if (this.cfg.startZookeeper.booleanValue()) {
            new ZookeeperServiceProvider(DeltaConfig.Config.newBuilder().withHiveLocal(true).build(), writerContext.getHoodieTestSuiteWriter().getConfiguration()).startLocalZookeeperIfNeeded();
        }
    }
}
