package defpackage;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.util.ResourceBundle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;

/* loaded from: input_file:HoodieJavaStreamingApp.class */
public class HoodieJavaStreamingApp {

    @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table")
    private String tablePath = "/tmp/hoodie/streaming/sample-table";

    @Parameter(names = {"--streaming-source-path", "-ssp"}, description = "path for streaming source file folder")
    private String streamingSourcePath = "/tmp/hoodie/streaming/source";

    @Parameter(names = {"--streaming-checkpointing-path", "-scp"}, description = "path for streaming checking pointing folder")
    private String streamingCheckpointingPath = "/tmp/hoodie/streaming/checkpoint";

    @Parameter(names = {"--streaming-duration-in-ms", "-sdm"}, description = "time in millisecond for the streaming duration")
    private Long streamingDurationInMs = 15000L;

    @Parameter(names = {"--table-name", "-n"}, description = "table name for Hoodie sample table")
    private String tableName = "hoodie_test";

    @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
    private String tableType = HoodieTableType.MERGE_ON_READ.name();

    @Parameter(names = {"--hive-sync", "-hv"}, description = "Enable syncing to hive")
    private Boolean enableHiveSync = false;

    @Parameter(names = {"--hive-db", "-hd"}, description = "hive database")
    private String hiveDB = "default";

    @Parameter(names = {"--hive-table", "-ht"}, description = "hive table")
    private String hiveTable = "hoodie_sample_test";

    @Parameter(names = {"--hive-user", "-hu"}, description = "hive username")
    private String hiveUser = "hive";

    @Parameter(names = {"--hive-password", "-hp"}, description = "hive password")
    private String hivePass = "hive";

    @Parameter(names = {"--hive-url", "-hl"}, description = "hive JDBC URL")
    private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";

    @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys")
    private Boolean useMultiPartitionKeys = false;

    @Parameter(names = {"--help", "-h"}, help = true)
    public Boolean help = false;
    private static final Logger LOG = LogManager.getLogger(HoodieJavaStreamingApp.class);

    public static void main(String[] strArr) throws Exception {
        HoodieJavaStreamingApp hoodieJavaStreamingApp = new HoodieJavaStreamingApp();
        JCommander jCommander = new JCommander(hoodieJavaStreamingApp, (ResourceBundle) null, strArr);
        if (hoodieJavaStreamingApp.help.booleanValue()) {
            jCommander.usage();
            System.exit(1);
        }
        int i = 0;
        try {
            try {
                hoodieJavaStreamingApp.run();
                System.exit(0);
            } catch (Exception e) {
                LOG.error("Got error running app ", e);
                i = -1;
                System.exit(-1);
            }
        } catch (Throwable th) {
            System.exit(i);
            throw th;
        }
    }

    public void run() throws Exception {
        SparkSession orCreate = SparkSession.builder().appName("Hoodie Spark Streaming APP").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
        JavaSparkContext javaSparkContext = new JavaSparkContext(orCreate.sparkContext());
        FileSystem fileSystem = FileSystem.get(javaSparkContext.hadoopConfiguration());
        fileSystem.delete(new Path(this.streamingSourcePath), true);
        fileSystem.delete(new Path(this.streamingCheckpointingPath), true);
        fileSystem.delete(new Path(this.tablePath), true);
        fileSystem.mkdirs(new Path(this.streamingSourcePath));
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        Dataset json = orCreate.read().json(javaSparkContext.parallelize(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("001", 100)), 2));
        Dataset json2 = orCreate.read().json(javaSparkContext.parallelize(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUpdatesForAllRecords("002")), 2));
        String str = this.streamingCheckpointingPath + "/stream1";
        String str2 = this.streamingSourcePath + "/stream1";
        fileSystem.mkdirs(new Path(str));
        fileSystem.mkdirs(new Path(str2));
        Dataset json3 = orCreate.readStream().schema(json.schema()).json(str2 + "/*");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            Future submit = newFixedThreadPool.submit(() -> {
                LOG.info("===== Streaming Starting =====");
                stream(json3, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), str);
                LOG.info("===== Streaming Ends =====");
                return null;
            });
            Future submit2 = newFixedThreadPool.submit(() -> {
                LOG.info("===== Showing Starting =====");
                int addInputAndValidateIngestion = addInputAndValidateIngestion(orCreate, fileSystem, str2, 0, 100, json, json2, true);
                LOG.info("===== Showing Ends =====");
                return Integer.valueOf(addInputAndValidateIngestion);
            });
            submit.get();
            int intValue = ((Integer) submit2.get()).intValue();
            newFixedThreadPool.shutdownNow();
            HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(javaSparkContext.hadoopConfiguration(), this.tablePath);
            if (this.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
                ValidationUtils.checkArgument(hoodieTableMetaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1);
            } else {
                ValidationUtils.checkArgument(hoodieTableMetaClient.getActiveTimeline().getCommitTimeline().getInstants().count() >= 1);
            }
            orCreate.close();
            SparkSession orCreate2 = SparkSession.builder().appName("Hoodie Spark Streaming APP").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
            JavaSparkContext javaSparkContext2 = new JavaSparkContext(orCreate2.sparkContext());
            String str3 = this.streamingCheckpointingPath + "/stream2";
            String str4 = str2 + "/stream2";
            fileSystem.mkdirs(new Path(str3));
            fileSystem.mkdirs(new Path(str4));
            Dataset json4 = orCreate2.readStream().schema(json.schema()).json(str4 + "/*");
            Dataset json5 = orCreate2.read().json(javaSparkContext2.parallelize(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("002", 20)), 2));
            ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(2);
            try {
                Future submit3 = newFixedThreadPool2.submit(() -> {
                    LOG.info("===== Streaming Starting =====");
                    stream(json4, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), str3);
                    LOG.info("===== Streaming Ends =====");
                    return null;
                });
                Future submit4 = newFixedThreadPool2.submit(() -> {
                    LOG.info("===== Showing Starting =====");
                    addInputAndValidateIngestion(orCreate2, fileSystem, str4, intValue, 80, json5, null, false);
                    LOG.info("===== Showing Ends =====");
                    return null;
                });
                submit3.get();
                submit4.get();
                newFixedThreadPool2.shutdown();
            } catch (Throwable th) {
                newFixedThreadPool2.shutdown();
                throw th;
            }
        } catch (Throwable th2) {
            newFixedThreadPool.shutdownNow();
            throw th2;
        }
    }

    private void waitTillNCommits(FileSystem fileSystem, int i, int i2, int i3) throws InterruptedException {
        HoodieTimeline allCompletedCommitsCompactions;
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis;
        long j2 = i2 * 1000;
        while (j - currentTimeMillis < j2) {
            try {
                try {
                    allCompletedCommitsCompactions = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fileSystem, this.tablePath);
                    LOG.info("Timeline :" + allCompletedCommitsCompactions.getInstants().collect(Collectors.toList()));
                } catch (TableNotFoundException e) {
                    LOG.info("Got table not found exception. Retrying");
                    Thread.sleep(i3 * 1000);
                    j = System.currentTimeMillis();
                }
                if (allCompletedCommitsCompactions.countInstants() >= i) {
                    Thread.sleep(i3 * 1000);
                    System.currentTimeMillis();
                    return;
                } else {
                    System.out.println("Instants :" + new HoodieTableMetaClient(fileSystem.getConf(), this.tablePath, true).getActiveTimeline().getInstants().collect(Collectors.toList()));
                    Thread.sleep(i3 * 1000);
                    j = System.currentTimeMillis();
                }
            } catch (Throwable th) {
                Thread.sleep(i3 * 1000);
                System.currentTimeMillis();
                throw th;
            }
        }
        throw new IllegalStateException("Timedout waiting for " + i + " commits to appear in " + this.tablePath);
    }

    public int addInputAndValidateIngestion(SparkSession sparkSession, FileSystem fileSystem, String str, int i, int i2, Dataset<Row> dataset, Dataset<Row> dataset2, boolean z) throws Exception {
        dataset.coalesce(1).write().mode(SaveMode.Append).json(str);
        int i3 = i + 1;
        waitTillNCommits(fileSystem, i3, 180, 3);
        String latestCommit = HoodieDataSourceHelpers.latestCommit(fileSystem, this.tablePath);
        LOG.info("First commit at instant time :" + latestCommit);
        String str2 = latestCommit;
        if (null != dataset2) {
            i3++;
            dataset2.write().mode(SaveMode.Append).json(str);
            Thread.sleep(3000L);
            waitTillNCommits(fileSystem, i3, 180, 3);
            str2 = HoodieDataSourceHelpers.latestCommit(fileSystem, this.tablePath);
            LOG.info("Second commit at instant time :" + str2);
        }
        if (this.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            i3++;
            waitTillNCommits(fileSystem, i3, 180, 3);
            str2 = HoodieDataSourceHelpers.latestCommit(fileSystem, this.tablePath);
            LOG.info("Compaction commit at instant time :" + str2);
        }
        sparkSession.read().format("hudi").load(this.tablePath + "/*/*/*/*").registerTempTable("hoodie_ro");
        sparkSession.sql("describe hoodie_ro").show();
        sparkSession.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
        if (z) {
            System.out.println("Showing all records. Latest Instant Time =" + str2);
            sparkSession.sql("select * from hoodie_ro").show(200, false);
            long count = sparkSession.sql("select * from hoodie_ro where _hoodie_commit_time = " + str2).count();
            ValidationUtils.checkArgument(count == ((long) i2), "Expecting " + i2 + " records, Got " + count);
        }
        long count2 = sparkSession.sql("select * from hoodie_ro").count();
        ValidationUtils.checkArgument(count2 == ((long) i2), "Expecting " + i2 + " records, Got " + count2);
        if (this.tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
            Dataset load = sparkSession.read().format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), latestCommit).load(this.tablePath);
            LOG.info("You will only see records from : " + str2);
            load.groupBy(new Column[]{load.col("_hoodie_commit_time")}).count().show();
        }
        return i3;
    }

    public void stream(Dataset<Row> dataset, String str, String str2) throws Exception {
        DataStreamWriter<Row> outputMode = dataset.writeStream().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option(DataSourceWriteOptions.OPERATION_OPT_KEY(), str).option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), this.tableType).option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key").option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition").option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp").option("hoodie.compact.inline.max.delta.commits", "1").option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(), "true").option("hoodie.table.name", this.tableName).option("checkpointLocation", str2).outputMode(OutputMode.Append());
        updateHiveSyncConfig(outputMode);
        outputMode.trigger(new ProcessingTime(500L)).start(this.tablePath).awaitTermination(this.streamingDurationInMs.longValue());
    }

    private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> dataStreamWriter) {
        if (this.enableHiveSync.booleanValue()) {
            LOG.info("Enabling Hive sync to " + this.hiveJdbcUrl);
            DataStreamWriter option = dataStreamWriter.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), this.hiveTable).option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), this.hiveDB).option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), this.hiveJdbcUrl).option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), this.hiveUser).option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), this.hivePass).option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
            dataStreamWriter = this.useMultiPartitionKeys.booleanValue() ? option.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getCanonicalName()) : option.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
        }
        return dataStreamWriter;
    }
}
