package org.apache.hudi.integ.testsuite.streaming;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
import org.apache.hudi.exception.HoodieException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.class */
public class StructuredStreamingSinkUtil implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StructuredStreamingSinkUtil.class);
    private transient JavaSparkContext jsc;
    private SparkSession sparkSession;
    private Config cfg;

    /* loaded from: input_file:org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--source-path", "-sp"}, description = "Source path to consume data from", required = true)
        public String sourcePath = null;

        @Parameter(names = {"--target-path", "-tp"}, description = "Target path of the table of interest.", required = true)
        public String targetPath = null;

        @Parameter(names = {"--table-type", "-ty"}, description = "Target path of the table of interest.", required = true)
        public String tableType = "COPY_ON_WRITE";

        @Parameter(names = {"--checkpoint-path", "-cp"}, description = "Checkppint path of the table of interest", required = true)
        public String checkpointPath = null;

        @Parameter(names = {"--partition-field", "-pp"}, description = "Partitioning field", required = true)
        public String partitionField = null;

        @Parameter(names = {"--record-key-field", "-rk"}, description = "record key field", required = true)
        public String recordKeyField = null;

        @Parameter(names = {"--pre-combine-field", "-pc"}, description = "Precombine field", required = true)
        public String preCombineField = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--disable-metadata", "-dmdt"}, description = "Disable metadata while querying", required = false)
        public Boolean disableMetadata = false;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
        public String sparkMemory = "1g";

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    public StructuredStreamingSinkUtil(JavaSparkContext javaSparkContext, Config config) {
        this.jsc = javaSparkContext;
        this.sparkSession = SparkSession.builder().config(javaSparkContext.getConf()).getOrCreate();
        this.cfg = config;
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        SparkConf buildSparkConf = buildSparkConf("Spark-structured-streaming-test", config.sparkMaster);
        buildSparkConf.set("spark.executor.memory", config.sparkMemory);
        JavaSparkContext javaSparkContext = new JavaSparkContext(buildSparkConf);
        try {
            try {
                new StructuredStreamingSinkUtil(javaSparkContext, config).run();
                javaSparkContext.stop();
            } catch (Throwable th) {
                LOG.error("Fail to execute tpcds read benchmarks for " + config, th);
                javaSparkContext.stop();
            }
        } catch (Throwable th2) {
            javaSparkContext.stop();
            throw th2;
        }
    }

    public void run() {
        try {
            try {
                LOG.info(this.cfg.toString());
                StructuredStreamingSinkTestWriter.triggerStreaming(this.sparkSession, this.cfg.tableType, this.cfg.sourcePath, this.cfg.targetPath, this.cfg.checkpointPath, this.cfg.tableName, this.cfg.partitionField, this.cfg.recordKeyField, this.cfg.preCombineField);
                StructuredStreamingSinkTestWriter.waitUntilCondition(600000L, 30000L);
                LOG.warn("Completing Spark Structured Streaming test");
            } catch (Exception e) {
                throw new HoodieException("Unable to test spark structured writes to hudi " + this.cfg.targetPath, e);
            }
        } catch (Throwable th) {
            LOG.warn("Completing Spark Structured Streaming test");
            throw th;
        }
    }

    public static SparkConf buildSparkConf(String str, String str2) {
        return buildSparkConf(str, str2, new HashMap());
    }

    private static SparkConf buildSparkConf(String str, String str2, Map<String, String> map) {
        SparkConf appName = new SparkConf().setAppName(str);
        String str3 = appName.get("spark.master", str2);
        appName.setMaster(str3);
        if (str3.startsWith("yarn")) {
            appName.set("spark.eventLog.overwrite", "true");
            appName.set("spark.eventLog.enabled", "true");
        }
        appName.set("spark.ui.port", "8090");
        appName.setIfMissing("spark.driver.maxResultSize", "2g");
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
        appName.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
        appName.set("spark.hadoop.mapred.output.compress", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        appName.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        appName.getClass();
        map.forEach(appName::set);
        return appName;
    }
}
