package org.apache.hudi.functional;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: TestMORDataSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ub\u0001B\u0001\u0003\u0001-\u0011\u0011\u0003V3ti6{%\u000bR1uCN{WO]2f\u0015\t\u0019A!\u0001\u0006gk:\u001cG/[8oC2T!!\u0002\u0004\u0002\t!,H-\u001b\u0006\u0003\u000f!\ta!\u00199bG\",'\"A\u0005\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001a\u0001CA\u0007\u0011\u001b\u0005q!BA\b\u0005\u0003%!Xm\u001d;vi&d7/\u0003\u0002\u0012\u001d\t!\u0002j\\8eS\u0016\u001cE.[3oiR+7\u000f\u001e\"bg\u0016DQa\u0005\u0001\u0005\u0002Q\ta\u0001P5oSRtD#A\u000b\u0011\u0005Y\u0001Q\"\u0001\u0002\t\u000fa\u0001\u0001\u0019!C\u00013\u0005)1\u000f]1sWV\t!\u0004\u0005\u0002\u001c?5\tAD\u0003\u0002\u001e=\u0005\u00191/\u001d7\u000b\u0005a1\u0011B\u0001\u0011\u001d\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0011\u001d\u0011\u0003\u00011A\u0005\u0002\r\n\u0011b\u001d9be.|F%Z9\u0015\u0005\u0011R\u0003CA\u0013)\u001b\u00051#\"A\u0014\u0002\u000bM\u001c\u0017\r\\1\n\u0005%2#\u0001B+oSRDqaK\u0011\u0002\u0002\u0003\u0007!$A\u0002yIEBa!\f\u0001!B\u0013Q\u0012AB:qCJ\\\u0007\u0005C\u00040\u0001\t\u0007I\u0011\u0002\u0019\u0002\u00071|w-F\u00012!\t\u0011T'D\u00014\u0015\t!d!A\u0003m_\u001e$$.\u0003\u00027g\t1Aj\\4hKJDa\u0001\u000f\u0001!\u0002\u0013\t\u0014\u0001\u00027pO\u0002BqA\u000f\u0001C\u0002\u0013\u00051(\u0001\u0006d_6lwN\\(qiN,\u0012\u0001\u0010\t\u0005{\t#E)D\u0001?\u0015\ty\u0004)A\u0005j[6,H/\u00192mK*\u0011\u0011IJ\u0001\u000bG>dG.Z2uS>t\u0017BA\"?\u0005\ri\u0015\r\u001d\t\u0003\u000b*k\u0011A\u0012\u0006\u0003\u000f\"\u000bA\u0001\\1oO*\t\u0011*\u0001\u0003kCZ\f\u0017BA&G\u0005\u0019\u0019FO]5oO\"1Q\n\u0001Q\u0001\nq\n1bY8n[>tw\n\u001d;tA!9q\n\u0001b\u0001\n\u0003\u0001\u0016a\u0004<fe&4\u0017nY1uS>t7i\u001c7\u0016\u0003E\u0003\"AU+\u000f\u0005\u0015\u001a\u0016B\u0001+'\u0003\u0019\u0001&/\u001a3fM&\u00111J\u0016\u0006\u0003)\u001aBa\u0001\u0017\u0001!\u0002\u0013\t\u0016\u0001\u0005<fe&4\u0017nY1uS>t7i\u001c7!\u0011\u001dQ\u0006A1A\u0005\u0002A\u000ba#\u001e9eCR,GMV3sS\u001aL7-\u0019;j_:4\u0016\r\u001c\u0005\u00079\u0002\u0001\u000b\u0011B)\u0002/U\u0004H-\u0019;fIZ+'/\u001b4jG\u0006$\u0018n\u001c8WC2\u0004\u0003\"\u00020\u0001\t\u0003z\u0016!B:fiV\u0003H#\u0001\u0013)\u0005u\u000b\u0007C\u00012j\u001b\u0005\u0019'B\u00013f\u0003\r\t\u0007/\u001b\u0006\u0003M\u001e\fqA[;qSR,'O\u0003\u0002i\u0011\u0005)!.\u001e8ji&\u0011!n\u0019\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0007\"\u00027\u0001\t\u0003z\u0016\u0001\u0003;fCJ$un\u001e8)\u0005-t\u0007C\u00012p\u0013\t\u00018MA\u0005BMR,'/R1dQ\")!\u000f\u0001C\u0001?\u00061B/Z:u\u001b\u0016\u0014x-Z(o%\u0016\fGm\u0015;pe\u0006<W\r\u000b\u0002riB\u0011!-^\u0005\u0003m\u000e\u0014A\u0001V3ti\")\u0001\u0010\u0001C\u0001?\u0006IA/Z:u\u0007>,h\u000e\u001e\u0015\u0003oRDQa\u001f\u0001\u0005\u0002}\u000b\u0011\u0003^3tiB\u000b\u0017\u0010\\8bI\u0012+G.\u001a;fQ\tQH\u000fC\u0003\u007f\u0001\u0011\u0005q,\u0001\nuKN$\bK];oK\u00124\u0015\u000e\u001c;fe\u0016$\u0007FA?u\u0011\u0019\t\u0019\u0001\u0001C\u0001?\u0006!B/Z:u-\u0016\u001cGo\u001c:ju\u0016$'+Z1eKJD3!!\u0001u\u0011\u001d\tI\u0001\u0001C\u0001\u0003\u0017\tAC^3sS\u001aL8k\u00195f[\u0006\fe\u000e\u001a+za\u0016\u001cHc\u0001\u0013\u0002\u000e!A\u0011qBA\u0004\u0001\u0004\t\t\"\u0001\u0002eMB!\u00111CA\u0018\u001d\u0011\t)\"a\u000b\u000f\t\u0005]\u0011\u0011\u0006\b\u0005\u00033\t9C\u0004\u0003\u0002\u001c\u0005\u0015b\u0002BA\u000f\u0003Gi!!a\b\u000b\u0007\u0005\u0005\"\"\u0001\u0004=e>|GOP\u0005\u0002\u0013%\u0011q\u0001C\u0005\u00031\u0019I!!\b\u0010\n\u0007\u00055B$A\u0004qC\u000e\\\u0017mZ3\n\t\u0005E\u00121\u0007\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T1!!\f\u001d\u0011\u001d\t9\u0004\u0001C\u0001\u0003s\t!B^3sS\u001aL8\u000b[8x)\r!\u00131\b\u0005\t\u0003\u001f\t)\u00041\u0001\u0002\u0012\u0001")
/* loaded from: input_file:org/apache/hudi/functional/TestMORDataSource.class */
public class TestMORDataSource extends HoodieClientTestBase {
    private SparkSession spark = null;
    private final Logger log = LogManager.getLogger(TestMORDataSource.class);
    private final Map<String, String> commonOpts = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY()), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY()), "partition"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY()), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.name"), "hoodie_test")}));
    private final String verificationCol = "driver";
    private final String updatedVerificationVal = "driver_update";

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession sparkSession) {
        this.spark = sparkSession;
    }

    private Logger log() {
        return this.log;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    public String verificationCol() {
        return this.verificationCol;
    }

    public String updatedVerificationVal() {
        return this.updatedVerificationVal;
    }

    @BeforeEach
    public void setUp() {
        initPath();
        initSparkContexts();
        spark_$eq(this.sqlContext.sparkSession());
        initTestDataGenerator();
        initFileSystem();
    }

    @AfterEach
    public void tearDown() {
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
    }

    @Test
    public void testMergeOnReadStorage() {
        FileSystem fs = FSUtils.getFs(this.basePath, spark().sparkContext().hadoopConfiguration());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION_OPT_KEY(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE_OPT_KEY(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, this.basePath, "000"));
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        Assertions.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{HoodieDataSourceHelpers.latestCommit(fs, this.basePath)})), ((TraversableOnce) JavaConversions$.MODULE$.asScalaBuffer(load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().collectAsList()).map(new TestMORDataSource$$anonfun$1(this), Buffer$.MODULE$.canBuildFrom())).toList());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        String latestCommit = HoodieDataSourceHelpers.latestCommit(fs, this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{latestCommit})), ((TraversableOnce) JavaConversions$.MODULE$.asScalaBuffer(load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().collectAsList()).map(new TestMORDataSource$$anonfun$2(this), Buffer$.MODULE$.canBuildFrom())).toList());
        String string = ((Row) load2.limit(1).select("_row_key", Predef$.MODULE$.wrapRefArray(new String[0])).first()).getString(0);
        load2.filter(functions$.MODULE$.col("_row_key").$eq$eq$eq(string)).withColumn(verificationCol(), functions$.MODULE$.lit(updatedVerificationVal())).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load3 = spark().read().format("hudi").load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load3.count());
        Assertions.assertEquals(updatedVerificationVal(), ((Row) load3.filter(functions$.MODULE$.col("_row_key").$eq$eq$eq(string)).select(verificationCol(), Predef$.MODULE$.wrapRefArray(new String[0])).first()).getString(0));
    }

    @Test
    public void testCount() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION_OPT_KEY(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE_OPT_KEY(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.fs, this.basePath, "000"));
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load2.count());
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        String obj2 = ((Row) load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertEquals(load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 1L);
        Assertions.assertTrue(new StringOps(Predef$.MODULE$.augmentString(obj2)).$greater(obj));
        Assertions.assertEquals(100L, load2.join(load, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Dataset load3 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY(), obj).load(this.basePath);
        Assertions.assertEquals(100L, load3.count());
        Assertions.assertEquals(1L, load3.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj, ((Row) load3.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        load3.show(1);
        Dataset load4 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), obj).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY(), obj2).load(this.basePath);
        Assertions.assertEquals(100L, load4.count());
        Assertions.assertEquals(1L, load4.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj2, ((Row) load4.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        load4.show(1);
        Dataset load5 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY(), obj2).load(this.basePath);
        Assertions.assertEquals(100L, load5.count());
        Assertions.assertEquals(1L, load5.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj2, ((Row) load5.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        Dataset load6 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE_OPT_KEY(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(200L, load6.count());
        Assertions.assertEquals(100L, load6.select("_hoodie_record_key", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(200L, load6.join(load2, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Assertions.assertEquals(100L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load7 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load7.count());
        Assertions.assertEquals(load7.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 2L);
        Assertions.assertEquals(50L, load7.filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj2)).count());
        Assertions.assertEquals(50L, load7.join(load2, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key", "_hoodie_commit_time"})), "inner").count());
        Assertions.assertEquals(50L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), obj2).load(this.basePath).count());
        Assertions.assertEquals(200L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE_OPT_KEY(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(this.basePath).count());
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(new String[]{"2020/01/10"});
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("004", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load8 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(200L, load8.count());
        Assertions.assertEquals(100L, load.join(load8, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "inner").count());
        Assertions.assertEquals(150L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), obj2).load(this.basePath).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("005", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        String latestCommit = HoodieDataSourceHelpers.latestCommit(this.fs, this.basePath);
        Assertions.assertEquals(200L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("006", Predef$.MODULE$.int2Integer(2)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "true").mode(SaveMode.Append).save(this.basePath);
        String latestCommit2 = HoodieDataSourceHelpers.latestCommit(this.fs, this.basePath);
        Assertions.assertEquals(102L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/2020/01/10/*").toString()).count());
        Assertions.assertEquals(152L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), latestCommit).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY(), latestCommit2).load(this.basePath).count());
    }

    @Test
    public void testPayloadDelete() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION_OPT_KEY(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE_OPT_KEY(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.fs, this.basePath, "000"));
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueDeleteRecords("002", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(50L, load2.count());
        Assertions.assertEquals(load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 1L);
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        String obj2 = ((Row) load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertTrue(obj.equals(obj2));
        Assertions.assertEquals(50L, load2.join(load, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Assertions.assertEquals(100L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE_OPT_KEY(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        Assertions.assertEquals(0L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), obj2).load(this.basePath).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueDeleteRecords("003", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Assertions.assertEquals(0L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
    }

    @Test
    public void testPrunedFiltered() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION_OPT_KEY(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE_OPT_KEY(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Dataset<Row> load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertEquals(100L, load.count());
        Assertions.assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", Predef$.MODULE$.refArrayOps(load.select("fare.amount", Predef$.MODULE$.wrapRefArray(new String[]{"fare.currency", "tip_history", "_hoodie_commit_seqno"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_seqno")})).columns()).mkString(","));
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset<Row> load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Dataset<Row> load3 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").load(this.basePath);
        Dataset<Row> load4 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE_OPT_KEY(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").load(this.basePath);
        Dataset<Row> load5 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), obj).load(this.basePath);
        Assertions.assertEquals(50L, load2.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj)).count());
        Assertions.assertEquals(50L, load3.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj)).count());
        Assertions.assertEquals(50L, load5.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).count());
        Assertions.assertEquals(150L, load4.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).count());
        verifySchemaAndTypes(load);
        verifySchemaAndTypes(load2);
        verifySchemaAndTypes(load3);
        verifySchemaAndTypes(load5);
        verifySchemaAndTypes(load4);
        verifyShow(load);
        verifyShow(load2);
        verifyShow(load3);
        verifyShow(load5);
        verifyShow(load4);
    }

    @Test
    public void testVectorizedReader() {
        spark().conf().set("spark.sql.parquet.enableVectorizedReader", true);
        Assertions.assertTrue(new StringOps(Predef$.MODULE$.augmentString(spark().conf().get("spark.sql.parquet.enableVectorizedReader"))).toBoolean());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsAsPerSchema("001", Predef$.MODULE$.int2Integer(100), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION_OPT_KEY(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE_OPT_KEY(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdatesAsPerSchema("002", Predef$.MODULE$.int2Integer(50), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load2.count());
        Row row = (Row) load2.select("fare", Predef$.MODULE$.wrapRefArray(new String[]{"driver", "_hoodie_is_deleted"})).head();
        Assertions.assertEquals(BoxesRunTime.boxToDouble(row.getDouble(0)), row.get(0));
        Assertions.assertEquals(row.getString(1), row.get(1));
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(row.getBoolean(2)), row.get(2));
        load.show(1);
        load2.show(1);
    }

    public void verifySchemaAndTypes(Dataset<Row> dataset) {
        Assertions.assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", Predef$.MODULE$.refArrayOps(dataset.select("fare.amount", Predef$.MODULE$.wrapRefArray(new String[]{"fare.currency", "tip_history", "_hoodie_commit_seqno"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_seqno")})).columns()).mkString(","));
        Row row = (Row) dataset.select("begin_lat", Predef$.MODULE$.wrapRefArray(new String[]{"current_date", "fare.currency", "tip_history", "nation"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_time")})).head();
        Assertions.assertEquals(BoxesRunTime.boxToDouble(row.getDouble(0)), row.get(0));
        Assertions.assertEquals(BoxesRunTime.boxToLong(row.getLong(1)), row.get(1));
        Assertions.assertEquals(row.getString(2), row.get(2));
        Assertions.assertEquals(row.getSeq(3), row.get(3));
        Assertions.assertEquals(row.getStruct(4), row.get(4));
    }

    public void verifyShow(Dataset<Row> dataset) {
        dataset.show(1);
        dataset.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).show(1);
    }
}
