package org.apache.hudi.functional;

import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.SparkDatasetMixin;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BooleanType$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import scala.Array$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.JavaConversions$;
import scala.collection.JavaConverters$;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: TestMORDataSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%h\u0001B\u0001\u0003\u0001-\u0011\u0011\u0003V3ti6{%\u000bR1uCN{WO]2f\u0015\t\u0019A!\u0001\u0006gk:\u001cG/[8oC2T!!\u0002\u0004\u0002\t!,H-\u001b\u0006\u0003\u000f!\ta!\u00199bG\",'\"A\u0005\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001a!\u0003\u0005\u0002\u000e!5\taB\u0003\u0002\u0010\t\u0005IA/Z:ukRLGn]\u0005\u0003#9\u0011A\u0003S8pI&,7\t\\5f]R$Vm\u001d;CCN,\u0007CA\n\u0015\u001b\u0005!\u0011BA\u000b\u0005\u0005E\u0019\u0006/\u0019:l\t\u0006$\u0018m]3u\u001b&D\u0018N\u001c\u0005\u0006/\u0001!\t\u0001G\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003e\u0001\"A\u0007\u0001\u000e\u0003\tAq\u0001\b\u0001A\u0002\u0013\u0005Q$A\u0003ta\u0006\u00148.F\u0001\u001f!\ty2%D\u0001!\u0015\t\t#%A\u0002tc2T!\u0001\b\u0004\n\u0005\u0011\u0002#\u0001D*qCJ\\7+Z:tS>t\u0007b\u0002\u0014\u0001\u0001\u0004%\taJ\u0001\ngB\f'o[0%KF$\"\u0001\u000b\u0018\u0011\u0005%bS\"\u0001\u0016\u000b\u0003-\nQa]2bY\u0006L!!\f\u0016\u0003\tUs\u0017\u000e\u001e\u0005\b_\u0015\n\t\u00111\u0001\u001f\u0003\rAH%\r\u0005\u0007c\u0001\u0001\u000b\u0015\u0002\u0010\u0002\rM\u0004\u0018M]6!\u0011\u001d\u0019\u0004A1A\u0005\nQ\n1\u0001\\8h+\u0005)\u0004C\u0001\u001c:\u001b\u00059$B\u0001\u001d\u0007\u0003\u0015awn\u001a\u001bk\u0013\tQtG\u0001\u0004M_\u001e<WM\u001d\u0005\u0007y\u0001\u0001\u000b\u0011B\u001b\u0002\t1|w\r\t\u0005\b}\u0001\u0011\r\u0011\"\u0001@\u0003)\u0019w.\\7p]>\u0003Ho]\u000b\u0002\u0001B!\u0011I\u0012%I\u001b\u0005\u0011%BA\"E\u0003%IW.\\;uC\ndWM\u0003\u0002FU\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u001d\u0013%aA'baB\u0011\u0011JT\u0007\u0002\u0015*\u00111\nT\u0001\u0005Y\u0006twMC\u0001N\u0003\u0011Q\u0017M^1\n\u0005=S%AB*ue&tw\r\u0003\u0004R\u0001\u0001\u0006I\u0001Q\u0001\fG>lWn\u001c8PaR\u001c\b\u0005C\u0004T\u0001\t\u0007I\u0011\u0001+\u0002\u001fY,'/\u001b4jG\u0006$\u0018n\u001c8D_2,\u0012!\u0016\t\u0003-fs!!K,\n\u0005aS\u0013A\u0002)sK\u0012,g-\u0003\u0002P5*\u0011\u0001L\u000b\u0005\u00079\u0002\u0001\u000b\u0011B+\u0002!Y,'/\u001b4jG\u0006$\u0018n\u001c8D_2\u0004\u0003b\u00020\u0001\u0005\u0004%\t\u0001V\u0001\u0017kB$\u0017\r^3e-\u0016\u0014\u0018NZ5dCRLwN\u001c,bY\"1\u0001\r\u0001Q\u0001\nU\u000bq#\u001e9eCR,GMV3sS\u001aL7-\u0019;j_:4\u0016\r\u001c\u0011\t\u000b\t\u0004A\u0011I2\u0002\u000bM,G/\u00169\u0015\u0003!B#!Y3\u0011\u0005\u0019lW\"A4\u000b\u0005!L\u0017aA1qS*\u0011!n[\u0001\bUV\u0004\u0018\u000e^3s\u0015\ta\u0007\"A\u0003kk:LG/\u0003\u0002oO\nQ!)\u001a4pe\u0016,\u0015m\u00195\t\u000bA\u0004A\u0011I2\u0002\u0011Q,\u0017M\u001d#po:D#a\u001c:\u0011\u0005\u0019\u001c\u0018B\u0001;h\u0005%\te\r^3s\u000b\u0006\u001c\u0007\u000eC\u0003w\u0001\u0011\u00051-A\u0005uKN$8i\\;oi\"\u0012Q\u000f\u001f\t\u0003MfL!A_4\u0003\tQ+7\u000f\u001e\u0005\u0006y\u0002!\taY\u0001\u0012i\u0016\u001cH\u000fU1zY>\fG\rR3mKR,\u0007FA>y\u0011\u0015y\b\u0001\"\u0001d\u0003I!Xm\u001d;QeVtW\r\u001a$jYR,'/\u001a3)\u0005yD\bBBA\u0003\u0001\u0011\u00051-\u0001\u000buKN$h+Z2u_JL'0\u001a3SK\u0006$WM\u001d\u0015\u0004\u0003\u0007A\bBBA\u0006\u0001\u0011\u00051-A\u000fuKN$\bK]3D_6\u0014\u0017N\\3GS2,GMR8s%\u0016\fG-T(SQ\r\tI\u0001\u001f\u0005\b\u0003#\u0001A\u0011BA\n\u0003%9(/\u001b;f\t\u0006$\u0018\rF\u0002)\u0003+A\u0001\"a\u0006\u0002\u0010\u0001\u0007\u0011\u0011D\u0001\u0005I\u0006$\u0018\r\u0005\u0007*\u00037\ty\"VA\u0010\u0003?\t)#C\u0002\u0002\u001e)\u0012a\u0001V;qY\u0016,\u0004cA\u0015\u0002\"%\u0019\u00111\u0005\u0016\u0003\u0007%sG\u000fE\u0002*\u0003OI1!!\u000b+\u0005\u001d\u0011un\u001c7fC:Dq!!\f\u0001\t\u0013\ty#A\u0006dQ\u0016\u001c7.\u00118to\u0016\u0014Hc\u0001\u0015\u00022!A\u00111GA\u0016\u0001\u0004\tI\"\u0001\u0004fqB,7\r\u001e\u0005\b\u0003o\u0001A\u0011AA\u001d\u0003Q1XM]5gsN\u001b\u0007.Z7b\u0003:$G+\u001f9fgR\u0019\u0001&a\u000f\t\u0011\u0005u\u0012Q\u0007a\u0001\u0003\u007f\t!\u0001\u001a4\u0011\t\u0005\u0005\u0013Q\f\b\u0005\u0003\u0007\nIF\u0004\u0003\u0002F\u0005]c\u0002BA$\u0003+rA!!\u0013\u0002T9!\u00111JA)\u001b\t\tiEC\u0002\u0002P)\ta\u0001\u0010:p_Rt\u0014\"A\u0005\n\u0005\u001dA\u0011B\u0001\u000f\u0007\u0013\t\t#%C\u0002\u0002\\\u0001\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002`\u0005\u0005$!\u0003#bi\u00064%/Y7f\u0015\r\tY\u0006\t\u0005\b\u0003K\u0002A\u0011AA4\u0003)1XM]5gsNCwn\u001e\u000b\u0004Q\u0005%\u0004\u0002CA\u001f\u0003G\u0002\r!a\u0010\t\u000f\u00055\u0004\u0001\"\u0001\u0002p\u0005!C/Z:u#V,'/_'P%^KG\u000f\u001b\"bg\u0016\u0004\u0016\r\u001e5B]\u00124\u0015\u000e\\3J]\u0012,\u0007\u0010F\u0003)\u0003c\n)\b\u0003\u0005\u0002t\u0005-\u0004\u0019AA\u0013\u0003=\u0001\u0018M\u001d;ji&|g.\u00128d_\u0012,\u0007\u0002CA<\u0003W\u0002\r!!\n\u0002#%\u001cX*\u001a;bI\u0006$\u0018-\u00128bE2,G\r\u000b\u0005\u0002l\u0005m\u00141RAG!\u0011\ti(a\"\u000e\u0005\u0005}$\u0002BAA\u0003\u0007\u000b\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0004\u0003\u000bK\u0017A\u00029be\u0006l7/\u0003\u0003\u0002\n\u0006}$!C\"tmN{WO]2f\u0003\u00151\u0018\r\\;fY!\ty)a%\u0002\u0018\u0006m\u0015EAAI\u0003)!(/^3-M\u0006d7/Z\u0011\u0003\u0003+\u000b\u0011\u0002\u001e:vK2\"(/^3\"\u0005\u0005e\u0015A\u00034bYN,G\u0006\u001e:vK\u0006\u0012\u0011QT\u0001\fM\u0006d7/\u001a\u0017gC2\u001cX\r\u000b\u0003\u0002l\u0005\u0005\u0006\u0003BAR\u0003Kk!!a!\n\t\u0005\u001d\u00161\u0011\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\bbBAV\u0001\u0011\u0005\u0011QV\u0001\u0016i\u0016\u001cH/T(S!\u0006\u0014H/\u001b;j_:\u0004&/\u001e8f)\u0015A\u0013qVAY\u0011!\t\u0019(!+A\u0002\u0005\u0015\u0002\u0002CAZ\u0003S\u0003\r!!\n\u0002%!Lg/Z*us2,\u0007+\u0019:uSRLwN\u001c\u0015\t\u0003S\u000bY(a#\u000282B\u0011\u0011XA_\u0003\u0003\f)-\t\u0002\u0002<\u0006YAO];fY\u00012\u0017\r\\:fC\t\ty,A\u0006gC2\u001cX\r\f\u0011ueV,\u0017EAAb\u000311\u0017\r\\:fY\u00012\u0017\r\\:fC\t\t9-\u0001\u0006ueV,G\u0006\t;sk\u0016DC!!+\u0002\"\"1\u0011Q\u001a\u0001\u0005\u0002\r\fq\u0004^3tiJ+\u0017\r\u001a'pO>sG._'fe\u001e,wJ\u001c*fC\u0012$\u0016M\u00197fQ\r\tY\r\u001f\u0005\u0007\u0003'\u0004A\u0011A2\u0002?Q,7\u000f\u001e+f[B4\u0015\u000e\\3t\u00072,\u0017M\u001c$pe\u000ecWo\u001d;fe&tw\rK\u0002\u0002RbDa!!7\u0001\t\u0003\u0019\u0017A\b;fgR\u001cE.^:uKJLgnZ(o\u001dVdG.\u00192mK\u000e{G.^7oQ\r\t9\u000e\u001f\u0005\u0007\u0003?\u0004A\u0011A2\u0002-Q,7\u000f\u001e%p_\u0012LW-S:EK2,G/\u001a3N\u001fJC3!!8y\u0011\u0019\t)\u000f\u0001C\u0001G\u0006yC/Z:u!J,h.\u001a)beRLG/[8o\r>\u0014H+[7fgR\fW\u000e\u001d\"bg\u0016$7*Z=HK:,'/\u0019;pe\"\u001a\u00111\u001d=")
/* loaded from: input_file:org/apache/hudi/functional/TestMORDataSource.class */
public class TestMORDataSource extends HoodieClientTestBase implements SparkDatasetMixin {
    private SparkSession spark;
    private final Logger log;
    private final Map<String, String> commonOpts;
    private final String verificationCol;
    private final String updatedVerificationVal;

    @Override // org.apache.hudi.SparkDatasetMixin
    public Dataset<Row> toDataset(SparkSession sparkSession, List<HoodieRecord<?>> list) {
        return SparkDatasetMixin.Cclass.toDataset(this, sparkSession, list);
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession sparkSession) {
        this.spark = sparkSession;
    }

    private Logger log() {
        return this.log;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    public String verificationCol() {
        return this.verificationCol;
    }

    public String updatedVerificationVal() {
        return this.updatedVerificationVal;
    }

    @BeforeEach
    public void setUp() {
        setTableName("hoodie_test");
        initPath();
        initSparkContexts();
        spark_$eq(this.sqlContext.sparkSession());
        initTestDataGenerator();
        initFileSystem();
    }

    @AfterEach
    public void tearDown() {
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
    }

    @Test
    public void testCount() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.fs, this.basePath, "000"));
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load2.count());
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        String obj2 = ((Row) load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertEquals(load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 1L);
        Assertions.assertTrue(new StringOps(Predef$.MODULE$.augmentString(obj2)).$greater(obj));
        Assertions.assertEquals(100L, load2.join(load, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Dataset load3 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), obj).load(this.basePath);
        Assertions.assertEquals(100L, load3.count());
        Assertions.assertEquals(1L, load3.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj, ((Row) load3.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        load3.show(1);
        Dataset load4 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), obj).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), obj2).load(this.basePath);
        Assertions.assertEquals(100L, load4.count());
        Assertions.assertEquals(1L, load4.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj2, ((Row) load4.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        load4.show(1);
        Dataset load5 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), obj2).load(this.basePath);
        Assertions.assertEquals(100L, load5.count());
        Assertions.assertEquals(1L, load5.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(obj2, ((Row) load5.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString());
        Assertions.assertEquals(0L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), "001").load(this.basePath).count());
        Dataset load6 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE().key(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(200L, load6.count());
        Assertions.assertEquals(100L, load6.select("_hoodie_record_key", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count());
        Assertions.assertEquals(200L, load6.join(load2, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Assertions.assertEquals(100L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load7 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load7.count());
        Assertions.assertEquals(load7.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 2L);
        Assertions.assertEquals(50L, load7.filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj2)).count());
        Assertions.assertEquals(50L, load7.join(load2, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key", "_hoodie_commit_time"})), "inner").count());
        Assertions.assertEquals(50L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), obj2).load(this.basePath).count());
        Assertions.assertEquals(200L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE().key(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(this.basePath).count());
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(new String[]{"2020/01/10"});
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("004", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load8 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(200L, load8.count());
        Assertions.assertEquals(100L, load.join(load8, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "inner").count());
        Assertions.assertEquals(150L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), obj2).load(this.basePath).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("005", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        String latestCommit = HoodieDataSourceHelpers.latestCommit(this.fs, this.basePath);
        Assertions.assertEquals(200L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("006", Predef$.MODULE$.int2Integer(2)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "true").mode(SaveMode.Append).save(this.basePath);
        String latestCommit2 = HoodieDataSourceHelpers.latestCommit(this.fs, this.basePath);
        Assertions.assertEquals(102L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/2020/01/10/*").toString()).count());
        Assertions.assertEquals(2L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), latestCommit).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), latestCommit2).load(this.basePath).count());
    }

    @Test
    public void testPayloadDelete() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.fs, this.basePath, "000"));
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueDeleteRecords("002", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(50L, load2.count());
        Assertions.assertEquals(load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count(), 1L);
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        String obj2 = ((Row) load2.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertTrue(obj.equals(obj2));
        Assertions.assertEquals(50L, load2.join(load, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_record_key"})), "left").count());
        Assertions.assertEquals(100L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE().key(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
        Assertions.assertEquals(0L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), obj2).load(this.basePath).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueDeleteRecords("003", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Assertions.assertEquals(0L, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
    }

    @Test
    public void testPrunedFiltered() {
        Map $plus = commonOpts().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), "partition_path"));
        List<HoodieRecord<?>> generateInserts = this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(100));
        toDataset(spark(), generateInserts).write().format("org.apache.hudi").options($plus).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.PAYLOAD_CLASS_NAME().key(), DefaultHoodieRecordPayload.class.getName()).mode(SaveMode.Overwrite).save(this.basePath);
        Dataset<Row> load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        String obj = ((Row) load.select("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).head()).get(0).toString();
        Assertions.assertEquals(100L, load.count());
        Assertions.assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", Predef$.MODULE$.refArrayOps(load.select("fare.amount", Predef$.MODULE$.wrapRefArray(new String[]{"fare.currency", "tip_history", "_hoodie_commit_seqno"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_seqno")})).columns()).mkString(","));
        toDataset(spark(), this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(50))).write().format("org.apache.hudi").options($plus).mode(SaveMode.Append).save(this.basePath);
        Dataset<Row> load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Dataset<Row> load3 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").load(this.basePath);
        Dataset<Row> load4 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.REALTIME_MERGE().key(), DataSourceReadOptions$.MODULE$.REALTIME_SKIP_MERGE_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").load(this.basePath);
        Dataset<Row> load5 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), obj).load(this.basePath);
        Assertions.assertEquals(50L, load2.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj)).count());
        Assertions.assertEquals(50L, load3.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).filter(functions$.MODULE$.col("_hoodie_commit_time").$greater(obj)).count());
        Assertions.assertEquals(50L, load5.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).count());
        Assertions.assertEquals(150L, load4.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).count());
        verifySchemaAndTypes(load);
        verifySchemaAndTypes(load2);
        verifySchemaAndTypes(load3);
        verifySchemaAndTypes(load5);
        verifySchemaAndTypes(load4);
        verifyShow(load);
        verifyShow(load2);
        verifyShow(load3);
        verifyShow(load5);
        verifyShow(load4);
        toDataset(spark(), this.dataGen.generateUpdatesWithTS("003", generateInserts, -1)).write().format("org.apache.hudi").options($plus).mode(SaveMode.Append).save(this.basePath);
        Dataset<Row> load6 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        verifyShow(load6);
        Assertions.assertEquals(100L, load6.count());
        Assertions.assertEquals(0L, load6.filter("rider = 'rider-003'").count());
    }

    @Test
    public void testVectorizedReader() {
        spark().conf().set("spark.sql.parquet.enableVectorizedReader", true);
        Assertions.assertTrue(new StringOps(Predef$.MODULE$.augmentString(spark().conf().get("spark.sql.parquet.enableVectorizedReader"))).toBoolean());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsAsPerSchema("001", Predef$.MODULE$.int2Integer(100), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load.count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdatesAsPerSchema("002", Predef$.MODULE$.int2Integer(50), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).mode(SaveMode.Append).save(this.basePath);
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100L, load2.count());
        Row row = (Row) load2.select("fare", Predef$.MODULE$.wrapRefArray(new String[]{"driver", "_hoodie_is_deleted"})).head();
        Assertions.assertEquals(BoxesRunTime.boxToDouble(row.getDouble(0)), row.get(0));
        Assertions.assertEquals(row.getString(1), row.get(1));
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(row.getBoolean(2)), row.get(2));
        load.show(1);
        load2.show(1);
    }

    @Test
    public void testPreCombineFiledForReadMOR() {
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(10), BoxesRunTime.boxToInteger(100), BoxesRunTime.boxToBoolean(false)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(10), BoxesRunTime.boxToInteger(100), BoxesRunTime.boxToBoolean(false)));
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(99), BoxesRunTime.boxToBoolean(false)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(10), BoxesRunTime.boxToInteger(100), BoxesRunTime.boxToBoolean(false)));
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(101), BoxesRunTime.boxToBoolean(false)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(101), BoxesRunTime.boxToBoolean(false)));
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(14), BoxesRunTime.boxToInteger(98), BoxesRunTime.boxToBoolean(false)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(101), BoxesRunTime.boxToBoolean(false)));
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(16), BoxesRunTime.boxToInteger(97), BoxesRunTime.boxToBoolean(true)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(101), BoxesRunTime.boxToBoolean(false)));
        writeData(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(18), BoxesRunTime.boxToInteger(96), BoxesRunTime.boxToBoolean(false)));
        checkAnswer(new Tuple5<>(BoxesRunTime.boxToInteger(1), "a0", BoxesRunTime.boxToInteger(12), BoxesRunTime.boxToInteger(101), BoxesRunTime.boxToBoolean(false)));
    }

    private void writeData(Tuple5<Object, String, Object, Object, Object> tuple5) {
        SparkSession spark = spark();
        spark.implicits().localSeqToDatasetHolder(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple5[]{tuple5})), spark.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(TestMORDataSource.class.getClassLoader()), new TypeCreator(this) { // from class: org.apache.hudi.functional.TestMORDataSource$$typecreator5$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple5"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Int").asType().toTypeConstructor(), universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), mirror.staticClass("scala.Int").asType().toTypeConstructor(), mirror.staticClass("scala.Int").asType().toTypeConstructor(), mirror.staticClass("scala.Boolean").asType().toTypeConstructor()})));
            }
        }))).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"id", "name", "value", "version", "_hoodie_is_deleted"})).write().format("org.apache.hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.PAYLOAD_CLASS_NAME().key(), DefaultHoodieRecordPayload.class.getCanonicalName()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), "id").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), "version").option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key(), "").option(DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName()).mode(SaveMode.Append).save(this.basePath);
    }

    private void checkAnswer(Tuple5<Object, String, Object, Object, Object> tuple5) {
        Dataset load = spark().read().format("org.apache.hudi").load(new StringBuilder().append(this.basePath).append("/*").toString());
        if (!BoxesRunTime.unboxToBoolean(tuple5._5())) {
            Assertions.assertEquals(Row$.MODULE$.apply(tuple5.productIterator().toSeq()), ((Row[]) load.select("id", Predef$.MODULE$.wrapRefArray(new String[]{"name", "value", "version", "_hoodie_is_deleted"})).take(1))[0]);
            return;
        }
        if (!load.isEmpty()) {
            Predef$.MODULE$.println(new StringBuilder().append("Found df ").append(((Row) load.collectAsList().get(0)).mkString(",")).toString());
        }
        Assertions.assertTrue(load.isEmpty());
    }

    public void verifySchemaAndTypes(Dataset<Row> dataset) {
        Assertions.assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", Predef$.MODULE$.refArrayOps(dataset.select("fare.amount", Predef$.MODULE$.wrapRefArray(new String[]{"fare.currency", "tip_history", "_hoodie_commit_seqno"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_seqno")})).columns()).mkString(","));
        Row row = (Row) dataset.select("begin_lat", Predef$.MODULE$.wrapRefArray(new String[]{"current_date", "fare.currency", "tip_history", "nation"})).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.desc("_hoodie_commit_time")})).head();
        Assertions.assertEquals(BoxesRunTime.boxToDouble(row.getDouble(0)), row.get(0));
        Assertions.assertEquals(row.getDate(1), row.get(1));
        Assertions.assertEquals(row.getString(2), row.get(2));
        Assertions.assertEquals(row.getSeq(3), row.get(3));
        Assertions.assertEquals(row.getAs(4), row.get(4));
    }

    public void verifyShow(Dataset<Row> dataset) {
        dataset.show(1);
        dataset.select("_hoodie_commit_seqno", Predef$.MODULE$.wrapRefArray(new String[]{"fare.amount", "fare.currency", "tip_history"})).show(1);
    }

    @ParameterizedTest
    @CsvSource({"true,false", "true,true", "false,true", "false,false"})
    public void testQueryMORWithBasePathAndFileIndex(boolean z, boolean z2) {
        List generateInsertsContainsAllPartitions = this.dataGen.generateInsertsContainsAllPartitions("000", Predef$.MODULE$.int2Integer(20));
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(generateInsertsContainsAllPartitions)), 2, ClassTag$.MODULE$.apply(String.class))).write().format("hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.URL_ENCODE_PARTITIONING().key(), z).option(HoodieMetadataConfig.ENABLE.key(), z2).mode(SaveMode.Overwrite).save(this.basePath);
        String latestCommit = HoodieDataSourceHelpers.latestCommit(this.fs, this.basePath);
        int count = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(generateInsertsContainsAllPartitions).asScala()).count(new TestMORDataSource$$anonfun$1(this));
        Assertions.assertEquals(count, spark().read().format("hudi").option(HoodieMetadataConfig.ENABLE.key(), z2).load(this.basePath).filter("partition = '2016/03/15'").count());
        Assertions.assertEquals(count, spark().read().format("hudi").option(HoodieMetadataConfig.ENABLE.key(), z2).load(new StringBuilder().append(this.basePath).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{z ? "2016%2F03%2F15" : "2016/03/15"}))).toString()).count());
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsContainsAllPartitions("000", Predef$.MODULE$.int2Integer(20 + 1)))), 2, ClassTag$.MODULE$.apply(String.class))).write().format("hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.URL_ENCODE_PARTITIONING().key(), z).option(HoodieMetadataConfig.ENABLE.key(), z2).mode(SaveMode.Append).save(this.basePath);
        Assertions.assertEquals(20 + 1, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), latestCommit).load(this.basePath).count());
    }

    @ParameterizedTest
    @CsvSource({"true, false", "false, true", "false, false", "true, true"})
    public void testMORPartitionPrune(boolean z, boolean z2) {
        String[] strArr = {"2021/03/01", "2021/03/02", "2021/03/03", "2021/03/04", "2021/03/05"};
        List generateInsertsContainsAllPartitions = new HoodieTestDataGenerator(strArr).generateInsertsContainsAllPartitions("000", Predef$.MODULE$.int2Integer(100));
        Dataset json = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(generateInsertsContainsAllPartitions)), 2, ClassTag$.MODULE$.apply(String.class)));
        Map map = Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(strArr).map(new TestMORDataSource$$anonfun$2(this, generateInsertsContainsAllPartitions), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).toMap(Predef$.MODULE$.$conforms());
        json.write().format("hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.URL_ENCODE_PARTITIONING().key(), z).option(DataSourceWriteOptions$.MODULE$.HIVE_STYLE_PARTITIONING().key(), z2).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertEquals(BoxesRunTime.unboxToInt(map.apply("2021/03/01")), spark().read().format("hudi").load(this.basePath).filter("partition = '2021/03/01'").count());
        Assertions.assertEquals(BoxesRunTime.unboxToInt(map.apply("2021/03/02")), spark().read().format("hudi").load(this.basePath).filter("partition > '2021/03/01' and partition < '2021/03/03'").count());
        Assertions.assertEquals(generateInsertsContainsAllPartitions.size() - BoxesRunTime.unboxToInt(map.apply("2021/03/01")), spark().read().format("hudi").load(this.basePath).filter("partition != '2021/03/01'").count());
        Assertions.assertEquals(BoxesRunTime.unboxToInt(map.apply("2021/03/03")), spark().read().format("hudi").load(this.basePath).filter("partition like '2021/03/03%'").count());
        Assertions.assertEquals(generateInsertsContainsAllPartitions.size(), spark().read().format("hudi").load(this.basePath).filter("partition like '%2021/03/%'").count());
        Assertions.assertEquals(BoxesRunTime.unboxToInt(map.apply("2021/03/01")) + BoxesRunTime.unboxToInt(map.apply("2021/03/05")), spark().read().format("hudi").load(this.basePath).filter("partition = '2021/03/01' or partition = '2021/03/05'").count());
        Assertions.assertEquals(BoxesRunTime.unboxToInt(map.apply("2021/03/03")), spark().read().format("hudi").load(this.basePath).filter("substr(partition, 9, 10) = '03'").count());
    }

    @Test
    public void testReadLogOnlyMergeOnReadTable() {
        initMetaClient(HoodieTableType.MERGE_ON_READ);
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsContainsAllPartitions("000", Predef$.MODULE$.int2Integer(20)))), 2, ClassTag$.MODULE$.apply(String.class))).write().format("hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.INMEMORY.toString()).mode(SaveMode.Overwrite).save(this.basePath);
        Assertions.assertTrue(DataSourceTestUtils.isLogFileOnly(this.basePath));
        Assertions.assertEquals(20L, spark().read().format("hudi").load(this.basePath).count());
    }

    @Test
    public void testTempFilesCleanForClustering() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(1000)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option("hoodie.clustering.inline", "true").option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon").mode(SaveMode.Overwrite).save(this.basePath);
        Path path = new Path(this.basePath, ".hoodie/.temp");
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(Predef$.MODULE$.refArrayOps(path.getFileSystem(spark().sparkContext().hadoopConfiguration()).listStatus(path)).isEmpty()));
    }

    @Test
    public void testClusteringOnNullableColumn() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(1000)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).withColumn("cluster_id", functions$.MODULE$.when(functions$.MODULE$.expr("end_lon < 0.2 "), functions$.MODULE$.lit((Object) null).cast("string")).otherwise(functions$.MODULE$.col("_row_key"))).withColumn("struct_cluster_col", functions$.MODULE$.when(functions$.MODULE$.expr("end_lon < 0.1"), functions$.MODULE$.lit((Object) null)).otherwise(functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("cluster_id"), functions$.MODULE$.col("_row_key")})))).write().format("org.apache.hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "1").option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").option("hoodie.clustering.plan.strategy.sort.columns", "struct_cluster_col").mode(SaveMode.Overwrite).save(this.basePath);
    }

    @Test
    public void testHoodieIsDeletedMOR() {
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsAsPerSchema("000", Predef$.MODULE$.int2Integer(100), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(commonOpts()).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath);
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString());
        Assertions.assertEquals(100, load.count());
        Dataset limit = load.limit(2);
        limit.drop(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps(limit.columns()).filter(new TestMORDataSource$$anonfun$3(this)))).withColumn("_hoodie_is_deleted", functions$.MODULE$.lit(BoxesRunTime.boxToBoolean(true)).cast(BooleanType$.MODULE$)).write().format("org.apache.hudi").options(commonOpts()).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        Assertions.assertEquals(100 - 2, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(new StringBuilder().append(this.basePath).append("/*/*/*/*").toString()).count());
    }

    @Test
    public void testPrunePartitionForTimestampBasedKeyGenerator() {
        Map $plus$plus = commonOpts().$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.compact.inline"), "false"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key()), "org.apache.hudi.keygen.TimestampBasedKeyGenerator"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.deltastreamer.keygen.timebased.timestamp.type"), "DATE_STRING"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.deltastreamer.keygen.timebased.output.dateformat"), "yyyy/MM/dd"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.deltastreamer.keygen.timebased.timezone"), "GMT+8:00"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.deltastreamer.keygen.timebased.input.dateformat"), "yyyy-MM-dd")})));
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(new HoodieTestDataGenerator(new String[]{"2022-01-01"}).generateInserts("001", Predef$.MODULE$.int2Integer(50)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Overwrite).save(this.basePath);
        ((HoodieClientTestHarness) this).metaClient = HoodieTableMetaClient.builder().setBasePath(this.basePath).setConf(spark().sessionState().newHadoopConf()).build();
        String timestamp = ((HoodieInstant) this.metaClient.getActiveTimeline().lastInstant().get()).getTimestamp();
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(new String[]{"2022-01-02"});
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("002", Predef$.MODULE$.int2Integer(60)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        String timestamp2 = ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(20)))).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        String timestamp3 = ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset load = spark().read().format("hudi").load(this.basePath);
        Assertions.assertEquals(load.where(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_commit_time = '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{timestamp}))).count(), 50L);
        Assertions.assertEquals(load.where(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_commit_time = '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{timestamp2}))).count(), 40L);
        Assertions.assertEquals(load.where(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"_hoodie_commit_time = '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{timestamp3}))).count(), 20L);
        Assertions.assertEquals(load.where("partition = '2022-01-01'").count(), 50L);
        Assertions.assertEquals(load.where("partition = '2022-01-02'").count(), 60L);
        Dataset load2 = spark().read().format("hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(this.basePath);
        Assertions.assertEquals(load2.where("partition = '2022-01-01'").count(), 50L);
        Assertions.assertEquals(load2.where("partition = '2022-01-02'").count(), 60L);
        Dataset load3 = spark().read().format("hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), timestamp2).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), timestamp3).load(this.basePath);
        Assertions.assertEquals(load3.where("partition = '2022-01-01'").count(), 0L);
        Assertions.assertEquals(load3.where("partition = '2022-01-02'").count(), 20L);
    }

    public TestMORDataSource() {
        SparkDatasetMixin.Cclass.$init$(this);
        this.spark = null;
        this.log = LogManager.getLogger(TestMORDataSource.class);
        this.commonOpts = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), "partition"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieWriteConfig.TBL_NAME.key()), "hoodie_test")}));
        this.verificationCol = "driver";
        this.updatedVerificationVal = "driver_update";
    }
}
