package org.apache.hudi.functional;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: TestSparkDataSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ee\u0001B\u0001\u0003\u0001-\u00111\u0003V3tiN\u0003\u0018M]6ECR\f7k\\;sG\u0016T!a\u0001\u0003\u0002\u0015\u0019,hn\u0019;j_:\fGN\u0003\u0002\u0006\r\u0005!\u0001.\u001e3j\u0015\t9\u0001\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0013\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0004\t\u0003\u001bAi\u0011A\u0004\u0006\u0003\u001f\u0011\t\u0011\u0002^3tiV$\u0018\u000e\\:\n\u0005Eq!\u0001I*qCJ\\7\t\\5f]R4UO\\2uS>t\u0017\r\u001c+fgRD\u0015M\u001d8fgNDQa\u0005\u0001\u0005\u0002Q\ta\u0001P5oSRtD#A\u000b\u0011\u0005Y\u0001Q\"\u0001\u0002\t\u000fa\u0001!\u0019!C\u00013\u0005Y\u0001/\u0019:bY2,G.[:n+\u0005Q\u0002CA\u000e!\u001b\u0005a\"BA\u000f\u001f\u0003\u0011a\u0017M\\4\u000b\u0003}\tAA[1wC&\u0011\u0011\u0005\b\u0002\b\u0013:$XmZ3s\u0011\u0019\u0019\u0003\u0001)A\u00055\u0005a\u0001/\u0019:bY2,G.[:nA!9Q\u0005\u0001b\u0001\n\u00031\u0013AC2p[6|gn\u00149ugV\tq\u0005\u0005\u0003)]E\ndBA\u0015-\u001b\u0005Q#\"A\u0016\u0002\u000bM\u001c\u0017\r\\1\n\u00055R\u0013A\u0002)sK\u0012,g-\u0003\u00020a\t\u0019Q*\u00199\u000b\u00055R\u0003C\u0001\u00153\u0013\t\u0019\u0004G\u0001\u0004TiJLgn\u001a\u0005\u0007k\u0001\u0001\u000b\u0011B\u0014\u0002\u0017\r|W.\\8o\u001fB$8\u000f\t\u0005\u0006o\u0001!\t\u0001O\u0001\ri\u0016\u001cHoQ8sK\u001acwn\u001e\u000b\u0005sqr\u0004\t\u0005\u0002*u%\u00111H\u000b\u0002\u0005+:LG\u000fC\u0003>m\u0001\u0007\u0011'A\u0005uC\ndW\rV=qK\")qH\u000ea\u0001c\u0005Y1.Z=HK:\u001cE.Y:t\u0011\u0015\te\u00071\u00012\u0003%Ig\u000eZ3y)f\u0004X\r\u000b\u00047\u0007>\u0003VL\u0018\t\u0003\t6k\u0011!\u0012\u0006\u0003\r\u001e\u000b\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0003\u0011&\u000ba\u0001]1sC6\u001c(B\u0001&L\u0003\u001dQW\u000f]5uKJT!\u0001\u0014\u0005\u0002\u000b),h.\u001b;\n\u00059+%!C\"tmN{WO]2f\u0003\u00151\u0018\r\\;fY\u0019\t6+V,Z7\u0006\n!+A\u001fD\u001fBKvl\u0014(`/JKE+\u0012?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?C\u0019>{U*I\u0001U\u0003y\u001au\nU-`\u001f:{vKU%U\u000br|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgfU5na2,7*Z=HK:,'/\u0019;per\u001c\u0016*\u0014)M\u000b\u0006\na+\u0001'D\u001fBKvl\u0014(`/JKE+\u0012?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]9{g\u000e]1si&$\u0018n\u001c8fI.+\u0017pR3oKJ\fGo\u001c:}\u000f2{%)\u0011'`\u00052{u*T\u0011\u00021\u0006iT*\u0012*H\u000b~{ej\u0018*F\u0003\u0012cxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gNL*j[BdWmS3z\u000f\u0016tWM]1u_Jd(\tT(P\u001b\u0006\n!,\u0001 N\u000bJ;UiX(O?J+\u0015\t\u0012?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?T\u00136\u0003F*R\u0011\u00029\u0006aU*\u0012*H\u000b~{ej\u0018*F\u0003\u0012cxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gN\f(p]B\f'\u000f^5uS>tW\rZ&fs\u001e+g.\u001a:bi>\u0014Hp\u0012'P\u0005\u0006cuL\u0011'P\u001f6\u000b\u0011\u0002Z3mS6LG/\u001a:\u001d\u0003qD#A\u000e1\u0011\u0005\u0005\u0014W\"A$\n\u0005\r<%!\u0005)be\u0006lW\r^3sSj,G\rV3ti\")Q\r\u0001C\u0001M\u0006)B/Z:u\u00136lW\u000f^1cY\u0016,6/\u001a:GY><H#B\u001dhQ*\\\u0007\"B\u001fe\u0001\u0004\t\u0004\"B5e\u0001\u0004\t\u0014!C8qKJ\fG/[8o\u0011\u0015yD\r1\u00012\u0011\u0015\tE\r1\u00012Q\u0019!7iT7^=2za\u000e\u001d:umbTHP`A\u0001\u0003\u000b\tI!I\u0001p\u0003\u0011\u001bu\nU-`\u001f:{vKU%U\u000brLgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgfU5na2,7*Z=HK:,'/\u0019;per\u0014EjT(NC\u0005\t\u0018!R\"P!f{vJT0X%&#V\t`5og\u0016\u0014H\u000f`8sO:\n\u0007/Y2iK:BW\u000fZ5/W\u0016Lx-\u001a8/'&l\u0007\u000f\\3LKf<UM\\3sCR|'\u000f`*J\u001bBcU)I\u0001t\u0003M\u001bu\nU-`\u001f:{vKU%U\u000brLgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgFT8oa\u0006\u0014H/\u001b;j_:,GmS3z\u000f\u0016tWM]1u_Jdx\tT(C\u00032{&\tT(P\u001b\u0006\nQ/\u0001#N\u000bJ;UiX(O?J+\u0015\t\u0012?j]N,'\u000f\u001e?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?C\u0019>{U*I\u0001x\u0003\u0015kUIU$F?>suLU#B\trLgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgfU5na2,7*Z=HK:,'/\u0019;per\u001c\u0016*\u0014)M\u000b\u0006\n\u00110A*N\u000bJ;UiX(O?J+\u0015\t\u0012?j]N,'\u000f\u001e?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]9{g\u000e]1si&$\u0018n\u001c8fI.+\u0017pR3oKJ\fGo\u001c:}\u000f2{%)\u0011'`\u00052{u*T\u0011\u0002w\u0006I5i\u0014)Z?>sul\u0016*J)\u0016c(-\u001e7l?&t7/\u001a:uy>\u0014xML1qC\u000eDWM\f5vI&t3.Z=hK:t3+[7qY\u0016\\U-_$f]\u0016\u0014\u0018\r^8sy\ncujT'\"\u0003u\f!jQ(Q3~{ejX,S\u0013R+EPY;mW~Kgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgfU5na2,7*Z=HK:,'/\u0019;per\u001c\u0016*\u0014)M\u000b\u0006\nq0\u0001-D\u001fBKvl\u0014(`/JKE+\u0012?ck2\\w,\u001b8tKJ$Hp\u001c:h]\u0005\u0004\u0018m\u00195f]!,H-\u001b\u0018lKf<WM\u001c\u0018O_:\u0004\u0018M\u001d;ji&|g.\u001a3LKf<UM\\3sCR|'\u000f`$M\u001f\n\u000bEj\u0018\"M\u001f>k\u0015EAA\u0002\u0003%kUIU$F?>suLU#B\tr\u0014W\u000f\\6`S:\u001cXM\u001d;}_J<g&\u00199bG\",g\u0006[;eS:ZW-_4f]:\u001a\u0016.\u001c9mK.+\u0017pR3oKJ\fGo\u001c:}\u00052{u*T\u0011\u0003\u0003\u000f\t!*T#S\u000f\u0016{vJT0S\u000b\u0006#EPY;mW~Kgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgfU5na2,7*Z=HK:,'/\u0019;per\u001c\u0016*\u0014)M\u000b\u0006\u0012\u00111B\u0001Y\u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#}EVd7nX5og\u0016\u0014H\u000f`8sO:\n\u0007/Y2iK:BW\u000fZ5/W\u0016Lx-\u001a8/\u001d>t\u0007/\u0019:uSRLwN\\3e\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?H\u0019>\u0013\u0015\tT0C\u0019>{U\n\u000b\u0002eA\"9\u0011\u0011\u0003\u0001\u0005\u0002\u0005M\u0011!G2p[B\f'/Z+qI\u0006$X\r\u00124XSRD\u0007*\u001e3j\t\u001a$\u0012\"OA\u000b\u0003_\t\u0019$a\u0014\t\u0011\u0005]\u0011q\u0002a\u0001\u00033\tq!\u001b8qkR$e\r\u0005\u0004\u0002\u001c\u0005\u0015\u0012\u0011F\u0007\u0003\u0003;QA!a\b\u0002\"\u0005\u00191/\u001d7\u000b\u0007\u0005\rb!A\u0003ta\u0006\u00148.\u0003\u0003\u0002(\u0005u!a\u0002#bi\u0006\u001cX\r\u001e\t\u0005\u00037\tY#\u0003\u0003\u0002.\u0005u!a\u0001*po\"A\u0011\u0011GA\b\u0001\u0004\tI\"\u0001\u0004ik\u0012LGI\u001a\u0005\t\u0003k\ty\u00011\u0001\u00028\u0005Q!-\u001a4pe\u0016\u0014vn^:\u0011\r\u0005e\u0012\u0011JA\u0015\u001d\u0011\tY$!\u0012\u000f\t\u0005u\u00121I\u0007\u0003\u0003\u007fQ1!!\u0011\u000b\u0003\u0019a$o\\8u}%\t1&C\u0002\u0002H)\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002L\u00055#\u0001\u0002'jgRT1!a\u0012+\u0011\u001d\t\t&a\u0004A\u0002E\nQbY8mgR{7i\\7qCJ,\u0007bBA+\u0001\u0011\u0005\u0011qK\u0001!G>l\u0007/\u0019:f\u000b:$\u0018N]3J]B,HOU8xg^KG\u000f\u001b%vI&$e\rF\u0004:\u00033\ni&a\u0018\t\u0011\u0005m\u00131\u000ba\u0001\u0003o\t\u0011\"\u001b8qkR\u0014vn^:\t\u0011\u0005E\u00121\u000ba\u0001\u00033Aq!!\u0015\u0002T\u0001\u0007\u0011\u0007C\u0004\u0002d\u0001!\t!!\u001a\u0002=\r|W\u000e]1sK\u0016sG/\u001b:f\u0013:\u0004X\u000f\u001e#g/&$\b\u000eS;eS\u00123GcB\u001d\u0002h\u0005%\u00141\u000e\u0005\t\u0003/\t\t\u00071\u0001\u0002\u001a!A\u0011\u0011GA1\u0001\u0004\tI\u0002C\u0004\u0002R\u0005\u0005\u0004\u0019A\u0019\t\u000f\u0005=\u0004\u0001\"\u0001\u0002r\u00059Bm\\'P%J+\u0017\rZ(qi&l\u0017N_3e#V,'/\u001f\u000b\bs\u0005M\u0014QOA=\u0011!\t9\"!\u001cA\u0002\u0005e\u0001bBA<\u0003[\u0002\r!M\u0001\rG>d7\u000fV8TK2,7\r\u001e\u0005\t\u0003w\ni\u00071\u0001\u0002~\u00059\u0012n]'fi\u0006$\u0017\r^1F]\u0006\u0014G.\u001a3P]J+\u0017\r\u001a\t\u0004S\u0005}\u0014bAAAU\t9!i\\8mK\u0006t\u0007bBAC\u0001\u0011\u0005\u0011qQ\u0001\u000fG>l\u0007/\u0019:f%>\u000be\u000e\u001a*U)\u001dI\u0014\u0011RAG\u0003\u001fCq!a#\u0002\u0004\u0002\u0007\u0011'\u0001\u0005cCN,\u0007+\u0019;i\u0011\u001d\t\t&a!A\u0002EB\u0001\"a\u001f\u0002\u0004\u0002\u0007\u0011Q\u0010")
/* loaded from: input_file:org/apache/hudi/functional/TestSparkDataSource.class */
public class TestSparkDataSource extends SparkClientFunctionalTestHarness {
    private final Integer parallelism = Predef$.MODULE$.int2Integer(4);
    private final Map<String, String> commonOpts = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{parallelism()}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{parallelism()}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.bulkinsert.shuffle.parallelism"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{parallelism()}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.delete.shuffle.parallelism"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{parallelism()}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieWriteConfig.TBL_NAME.key()), "hoodie_test")}));

    public Integer parallelism() {
        return this.parallelism;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @ParameterizedTest
    @CsvSource(value = {"COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"}, delimiter = '|')
    public void testCoreFlow(String str, String str2, String str3) {
        Map $plus = commonOpts().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieMetadataConfig.ENABLE.key()), String.valueOf(true))).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key()), str2)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), NonpartitionedKeyGenerator.class.getName().equals(str2) ? "" : "partition")).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), str)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieIndexConfig.INDEX_TYPE.key()), str3));
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(57069L);
        FileSystem fs = HadoopFSUtils.getFs(basePath(), spark().sparkContext().hadoopConfiguration());
        Dataset<Row> cache = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("000", Predef$.MODULE$.int2Integer(10)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache.write().format("org.apache.hudi").options($plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Overwrite).save(basePath());
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath(), "000"));
        Dataset<Row> cache2 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
        Assertions.assertEquals(10L, cache2.count());
        compareEntireInputDfWithHudiDf(cache, cache2, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        List<Row> list = Predef$.MODULE$.refArrayOps((Object[]) cache2.collect()).toList();
        cache2.unpersist(true);
        Dataset<Row> cache3 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(5)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache3.write().format("org.apache.hudi").options($plus).mode(SaveMode.Append).save(basePath());
        String latestCommit = HoodieDataSourceHelpers.latestCommit(fs, basePath());
        Dataset<Row> cache4 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
        Assertions.assertEquals(10L, cache4.count());
        compareUpdateDfWithHudiDf(cache3, cache4, list, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        List<Row> list2 = Predef$.MODULE$.refArrayOps((Object[]) cache4.collect()).toList();
        cache4.unpersist(true);
        Dataset<Row> cache5 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(6)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        long count = cache5.select("_row_key", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count();
        cache5.write().format("org.apache.hudi").options($plus).mode(SaveMode.Append).save(basePath());
        String latestCommit2 = HoodieDataSourceHelpers.latestCommit(fs, basePath());
        Assertions.assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath(), "000").size());
        Dataset<Row> cache6 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
        Assertions.assertEquals(10L, cache6.count(), "should still be 10, since we only updated");
        compareUpdateDfWithHudiDf(cache5, cache6, list2, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        cache6.unpersist(true);
        String str4 = (String) HoodieDataSourceHelpers.listCommitsSince(fs, basePath(), "000").get(0);
        Dataset load = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), str4).option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath());
        Assertions.assertEquals(10L, load.count(), "should have pulled 10 initial inserts");
        Row[] rowArr = (Row[]) load.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
        Assertions.assertEquals(1, rowArr.length);
        Assertions.assertEquals(str4, rowArr[0].get(0));
        Dataset cache7 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(8)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache7.write().format("org.apache.hudi").options($plus).mode(SaveMode.Append).save(basePath());
        Dataset load2 = spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), latestCommit).option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), latestCommit2).option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath());
        Assertions.assertEquals(count, load2.count(), "should have pulled 6 records");
        Row[] rowArr2 = (Row[]) load2.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
        Assertions.assertEquals(1, rowArr2.length);
        Assertions.assertEquals(latestCommit2, rowArr2[0].get(0));
        Dataset<Row> cache8 = spark().read().format("org.apache.hudi").option("as.of.instant", latestCommit).load(basePath()).cache();
        Assertions.assertEquals(10L, cache8.count());
        compareEntireInputRowsWithHudiDf(list2, cache8, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        cache8.unpersist(true);
        if (str.equals("MERGE_ON_READ")) {
            doMORReadOptimizedQuery(cache, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted", true);
            List<Row> list3 = Predef$.MODULE$.refArrayOps((Object[]) spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).collect()).toList();
            Assertions.assertEquals(10, list3.length());
            Dataset<Row> cache9 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateUniqueUpdates("004", Predef$.MODULE$.int2Integer(4)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
            cache9.write().format("org.apache.hudi").options($plus).option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3").mode(SaveMode.Append).save(basePath());
            Dataset<Row> cache10 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
            compareUpdateDfWithHudiDf(cache9, cache10, list3, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
            cache9.unpersist(true);
            cache10.unpersist(true);
            compareROAndRT(basePath(), "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted", true);
        }
        cache.unpersist(true);
        cache3.unpersist(true);
        cache5.unpersist(true);
        cache7.unpersist(true);
    }

    @ParameterizedTest
    @CsvSource(value = {"COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"}, delimiter = '|')
    public void testImmutableUserFlow(String str, String str2, String str3, String str4) {
        Map $plus = commonOpts().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieMetadataConfig.ENABLE.key()), String.valueOf(true))).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key()), str3)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), NonpartitionedKeyGenerator.class.getName().equals(str3) ? "" : "partition")).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), str)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieIndexConfig.INDEX_TYPE.key()), str4));
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(57069L);
        FileSystem fs = HadoopFSUtils.getFs(basePath(), spark().sparkContext().hadoopConfiguration());
        Dataset cache = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("000", Predef$.MODULE$.int2Integer(10)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache.write().format("org.apache.hudi").options($plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), str2).mode(SaveMode.Overwrite).save(basePath());
        Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath(), "000"));
        Assertions.assertEquals(10L, spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).count());
        Dataset cache2 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("001", Predef$.MODULE$.int2Integer(5)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache2.write().format("org.apache.hudi").options($plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), str2).mode(SaveMode.Append).save(basePath());
        Dataset<Row> cache3 = spark().read().format("hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
        Assertions.assertEquals(15L, cache3.count());
        compareEntireInputDfWithHudiDf(cache2.union(cache), cache3, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        cache3.unpersist(true);
        Dataset cache4 = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(hoodieTestDataGenerator.generateInserts("002", Predef$.MODULE$.int2Integer(6)))).asScala()).toList(), Predef$.MODULE$.Integer2int(parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        cache4.write().format("org.apache.hudi").options($plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), str2).mode(SaveMode.Append).save(basePath());
        Assertions.assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath(), "000").size());
        Dataset<Row> cache5 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), true).load(basePath()).cache();
        Assertions.assertEquals(21L, cache5.count());
        compareEntireInputDfWithHudiDf(cache2.union(cache).union(cache4), cache5, "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted");
        cache5.unpersist(true);
        cache.unpersist(true);
        cache2.unpersist(true);
        cache4.unpersist(true);
    }

    public void compareUpdateDfWithHudiDf(Dataset<Row> dataset, Dataset<Row> dataset2, List<Row> list, String str) {
        dataset2.drop(Predef$.MODULE$.wrapRefArray(new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD})).registerTempTable("hudiTbl");
        dataset.registerTempTable("inputTbl");
        spark().createDataFrame((java.util.List) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava(), dataset2.schema()).registerTempTable("beforeTbl");
        Dataset sql = spark().sqlContext().sql(new StringBuilder().append("select ").append(str).append(" from hudiTbl").toString());
        Dataset sql2 = spark().sqlContext().sql(new StringBuilder().append("select ").append(str).append(" from inputTbl").toString());
        Dataset sql3 = spark().sqlContext().sql(new StringBuilder().append("select ").append(str).append(" from beforeTbl").toString());
        Assertions.assertEquals(sql2.count(), sql.intersect(sql2).count());
        Assertions.assertEquals(0.0f, (float) sql.except(sql2).except(sql3).count(), 0.0f);
    }

    public void compareEntireInputRowsWithHudiDf(List<Row> list, Dataset<Row> dataset, String str) {
        compareEntireInputDfWithHudiDf(spark().createDataFrame((java.util.List) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava(), dataset.schema()), dataset, str);
    }

    public void compareEntireInputDfWithHudiDf(Dataset<Row> dataset, Dataset<Row> dataset2, String str) {
        dataset2.drop(Predef$.MODULE$.wrapRefArray(new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD})).registerTempTable("hudiTbl");
        dataset.registerTempTable("inputTbl");
        Dataset sql = spark().sqlContext().sql(new StringBuilder().append("select ").append(str).append(" from hudiTbl").toString());
        Dataset sql2 = spark().sqlContext().sql(new StringBuilder().append("select ").append(str).append(" from inputTbl").toString());
        Assertions.assertEquals(sql2.count(), sql.intersect(sql2).count());
        Assertions.assertEquals(0L, sql.except(sql2).count());
    }

    public void doMORReadOptimizedQuery(Dataset<Row> dataset, String str, boolean z) {
        compareEntireInputDfWithHudiDf(dataset, spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).option(HoodieMetadataConfig.ENABLE.key(), z).load(basePath()), str);
    }

    public void compareROAndRT(String str, String str2, boolean z) {
        Dataset load = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), z).option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(str);
        Dataset load2 = spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), z).load(str);
        Dataset drop = load.drop(Predef$.MODULE$.wrapRefArray(new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        Dataset drop2 = load2.drop(Predef$.MODULE$.wrapRefArray(new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        drop.registerTempTable("hudiTbl1");
        drop2.registerTempTable("hudiTbl2");
        Dataset sql = spark().sqlContext().sql(new StringBuilder().append("select ").append(str2).append(" from hudiTbl1").toString());
        Dataset sql2 = spark().sqlContext().sql(new StringBuilder().append("select ").append(str2).append(" from hudiTbl2").toString());
        Assertions.assertEquals(sql.count(), sql.intersect(sql2).count());
        Assertions.assertEquals(0L, sql.except(sql2).count());
    }
}
