package org.apache.hudi.functional.cdc;

import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.QuickstartUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import scala.Array$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: TestCDCDataFrameSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001m4A!\u0001\u0002\u0001\u001b\t)B+Z:u\u0007\u0012\u001bE)\u0019;b\rJ\fW.Z*vSR,'BA\u0002\u0005\u0003\r\u0019Gm\u0019\u0006\u0003\u000b\u0019\t!BZ;oGRLwN\\1m\u0015\t9\u0001\"\u0001\u0003ik\u0012L'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001\u001dA\u0011q\u0002E\u0007\u0002\u0005%\u0011\u0011C\u0001\u0002\u0012\u0011>|G-[3D\t\u000e#Vm\u001d;CCN,\u0007\"B\n\u0001\t\u0003!\u0012A\u0002\u001fj]&$h\bF\u0001\u0016!\ty\u0001\u0001C\u0003\u0018\u0001\u0011\u0005\u0001$\u0001\fuKN$8iT,ECR\f7k\\;sG\u0016<&/\u001b;f)\tIr\u0004\u0005\u0002\u001b;5\t1DC\u0001\u001d\u0003\u0015\u00198-\u00197b\u0013\tq2D\u0001\u0003V]&$\b\"\u0002\u0011\u0017\u0001\u0004\t\u0013a\u00037pO\u001eLgnZ'pI\u0016\u0004\"A\t\u0015\u000e\u0003\rR!a\u0001\u0013\u000b\u0005\u00152\u0013!\u0002;bE2,'BA\u0014\u0007\u0003\u0019\u0019w.\\7p]&\u0011\u0011f\t\u0002!\u0011>|G-[3D\t\u000e\u001bV\u000f\u001d9mK6,g\u000e^1m\u0019><w-\u001b8h\u001b>$W\r\u000b\u0003\u0017W]B\u0004C\u0001\u00176\u001b\u0005i#B\u0001\u00180\u0003!\u0001(o\u001c<jI\u0016\u0014(B\u0001\u00192\u0003\u0019\u0001\u0018M]1ng*\u0011!gM\u0001\bUV\u0004\u0018\u000e^3s\u0015\t!$\"A\u0003kk:LG/\u0003\u00027[\tQQI\\;n'>,(oY3\u0002\u000bY\fG.^3$\u0003\u0005B#A\u0006\u001e\u0011\u0005mbT\"A\u0018\n\u0005uz#!\u0005)be\u0006lW\r^3sSj,G\rV3ti\")q\b\u0001C\u0001\u0001\u00061B/Z:u\u001b>\u0013F)\u0019;b'>,(oY3Xe&$X\r\u0006\u0002\u001a\u0003\")\u0001E\u0010a\u0001C!\"ahK\u001c9Q\tq$\bC\u0003F\u0001\u0011\u0005a)A\u0013uKN$H)\u0019;b'>,(oY3Xe&$XmV5uQB\u000b'\u000f^5uS>tg)[3mIR\u0019\u0011d\u0012)\t\u000b!#\u0005\u0019A%\u0002\u0013Q\f'\r\\3UsB,\u0007C\u0001&N\u001d\tQ2*\u0003\u0002M7\u00051\u0001K]3eK\u001aL!AT(\u0003\rM#(/\u001b8h\u0015\ta5\u0004C\u0003!\t\u0002\u0007\u0011\n\u000b\u0003E%^*\u0006C\u0001\u0017T\u0013\t!VFA\u0005DgZ\u001cv.\u001e:dK22a\u000b\u0017.]=\u0002\f\u0013aV\u0001 \u0007>\u0003\u0016lX(O?^\u0013\u0016\nV#-I\u0006$\u0018m\u00182fM>\u0014XmX1gi\u0016\u0014\u0018%A-\u0002?5+%kR#`\u001f:{&+R!EY\u0011\fG/Y0cK\u001a|'/Z0bMR,'/I\u0001\\\u0003e\u0019u\nU-`\u001f:{vKU%U\u000b2\"\u0017\r^1`E\u00164wN]3\"\u0003u\u000b\u0011$T#S\u000f\u0016{vJT0S\u000b\u0006#E\u0006Z1uC~\u0013WMZ8sK\u0006\nq,A\rD\u001fBKvl\u0014(`/JKE+\u0012\u0017pa~[W-_0p]2L\u0018%A1\u000235+%kR#`\u001f:{&+R!EY=\u0004xl[3z?>tG.\u001f\u0015\u0003\tjBQ\u0001\u001a\u0001\u0005\u0002\u0015\f\u0011\u0005^3ti\u000e#5iV5uQ6+H\u000e^5CY>\u001c7n]!oI2{wMR5mKN$\"!\u00074\t\u000b\u0001\u001a\u0007\u0019A\u0011)\t\r\\s\u0007\u000f\u0015\u0003GjBQA\u001b\u0001\u0005\u0002-\f\u0001\u0004^3ti\u000e#5iV5uQ\u0006;6\u000bR'T!\u0006LHn\\1e)\tIB\u000eC\u0003!S\u0002\u0007\u0011\u0005\u000b\u0003jW]B\u0004FA5;\u0011\u0015\u0001\b\u0001\"\u0001r\u0003I!Xm\u001d;D\t\u000e\u001bE.Z1o%\u0016$\u0018-\u001b8\u0015\u0005e\u0011\b\"\u0002\u0011p\u0001\u0004\t\u0003\u0006B8,oaB#a\u001c\u001e\t\u000bY\u0004A\u0011A<\u0002YQ,7\u000f^\"E\u0007^CWM\u001c$jeN$xK]5uK\u000e{g\u000e^1j]N,\u0006o]3si\u0006sG\rR3mKR,GCA\ry\u0011\u0015\u0001S\u000f1\u0001\"Q\u0011)8f\u000e\u001d)\u0005UT\u0004")
/* loaded from: input_file:org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.class */
public class TestCDCDataFrameSuite extends HoodieCDCTestBase {
    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testCOWDataSourceWrite(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        Map $plus$plus = commonOpts().$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), hoodieCDCSupplementalLoggingMode.name())})));
        spark().emptyDataFrame();
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Overwrite).save(this.basePath);
        ((HoodieSparkClientTestHarness) this).metaClient = createMetaClient(spark(), this.basePath);
        Schema tableAvroSchema = new TableSchemaResolver(this.metaClient).getTableAvroSchema(false);
        Schema schemaBySupplementalLoggingMode = HoodieCDCUtils.schemaBySupplementalLoggingMode(hoodieCDCSupplementalLoggingMode, tableAvroSchema);
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant));
        String timestamp = hoodieInstant.getTimestamp();
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), 100L, 0L, 0L);
        List<HoodieRecord<?>> generateUniqueUpdates = this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50));
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(generateUniqueUpdates)).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant2 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertTrue(hasCDCLogFile(hoodieInstant2));
        scala.collection.immutable.List list = (scala.collection.immutable.List) getCDCLogFile(hoodieInstant2).flatMap(new TestCDCDataFrameSuite$$anonfun$1(this, schemaBySupplementalLoggingMode), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals(list.size(), 50);
        checkCDCDataForInsertOrUpdate(hoodieCDCSupplementalLoggingMode, schemaBySupplementalLoggingMode, tableAvroSchema, list, generateUniqueUpdates, HoodieCDCOperation.UPDATE);
        String timestamp2 = hoodieInstant2.getTimestamp();
        long count = spark().read().format("hudi").load(this.basePath).count() - 100;
        long j = 50 - count;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp2)).toLong() - 1).toString(), cdcDataFrame$default$2()), count, j, 0L);
        long j2 = 0 + j;
        long j3 = 0 + 100 + count;
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.deleteRecordsToStrings(this.dataGen.generateUniqueDeletes(Predef$.MODULE$.int2Integer(20)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_OPERATION_OPT_VAL()).option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "1").mode(SaveMode.Append).save(this.basePath);
        String timestamp3 = ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset load = spark().read().format("hudi").load(this.basePath);
        assertCDCOpCnt(cdcDataFrame(timestamp2, cdcDataFrame$default$2()), 0L, 0L, 20L);
        long j4 = 0 + 20;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j3, j2, j4);
        assertCDCOpCnt(cdcDataFrame(timestamp, timestamp3), count, j, 20L);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("003", Predef$.MODULE$.int2Integer(50)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant3 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant3));
        Dataset<Row> cdcDataFrame = cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(hoodieInstant3.getTimestamp())).toLong() - 1).toString(), cdcDataFrame$default$2());
        long count2 = load.count();
        assertCDCOpCnt(cdcDataFrame, 50, 0L, count2);
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j3 + 50, j2, j4 + count2);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("005", Predef$.MODULE$.int2Integer(7)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("006", Predef$.MODULE$.int2Integer(3)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("007", Predef$.MODULE$.int2Integer(30)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option("hoodie.clean.automatic", "true").option("hoodie.keep.min.commits", "4").option("hoodie.keep.max.commits", "5").option("hoodie.clean.commits.retained", "3").mode(SaveMode.Append).save(this.basePath);
        Dataset<Row> cdcDataFrame2 = cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) this.metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get()).getTimestamp())).toLong() - 1).toString(), cdcDataFrame$default$2());
        long count3 = spark().read().format("hudi").load(this.basePath).count() - 60;
        long j5 = 30 - count3;
        assertCDCOpCnt(cdcDataFrame2, count3, j5, 0L);
        long j6 = 60 + count3;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j6, j5, 0L);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("008", Predef$.MODULE$.int2Integer(20)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant4 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant4));
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(hoodieInstant4.getTimestamp())).toLong() - 1).toString(), cdcDataFrame$default$2()), 20L, 0L, 0L);
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j6 + 20, j5, 0L);
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testMORDataSourceWrite(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        Map $plus$plus = commonOpts().$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), hoodieCDCSupplementalLoggingMode.name())})));
        spark().emptyDataFrame();
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Overwrite).save(this.basePath);
        ((HoodieSparkClientTestHarness) this).metaClient = createMetaClient(spark(), this.basePath);
        Schema schemaBySupplementalLoggingMode = HoodieCDCUtils.schemaBySupplementalLoggingMode(hoodieCDCSupplementalLoggingMode, new TableSchemaResolver(this.metaClient).getTableAvroSchema(false));
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant));
        String timestamp = hoodieInstant.getTimestamp();
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), 100L, 0L, 0L);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(30)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).union(spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(20)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class)))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant2 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertTrue(hasCDCLogFile(hoodieInstant2));
        scala.collection.immutable.List list = (scala.collection.immutable.List) getCDCLogFile(hoodieInstant2).flatMap(new TestCDCDataFrameSuite$$anonfun$2(this, schemaBySupplementalLoggingMode), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals(list.size(), 50);
        Assertions.assertEquals(list.count(new TestCDCDataFrameSuite$$anonfun$testMORDataSourceWrite$1(this)), 30);
        Assertions.assertEquals(list.count(new TestCDCDataFrameSuite$$anonfun$testMORDataSourceWrite$2(this)), 20);
        String timestamp2 = hoodieInstant2.getTimestamp();
        long count = spark().read().format("hudi").load(this.basePath).count() - 100;
        long j = 50 - count;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp2)).toLong() - 1).toString(), cdcDataFrame$default$2()), count, j, 0L);
        long j2 = 0 + j;
        long j3 = 0 + 100 + count;
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.deleteRecordsToStrings(this.dataGen.generateUniqueDeletes(Predef$.MODULE$.int2Integer(20)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_OPERATION_OPT_VAL()).option("hoodie.compact.inline", "true").option("hoodie.compact.inline.max.delta.commits", "1").mode(SaveMode.Append).save(this.basePath);
        ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        spark().read().format("hudi").load(this.basePath);
        assertCDCOpCnt(cdcDataFrame(timestamp2, cdcDataFrame$default$2()), 0L, 0L, 20L);
        long j4 = 0 + 20;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j3, j2, j4);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("003", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant3 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant3));
        String timestamp3 = hoodieInstant3.getTimestamp();
        long count2 = spark().read().format("hudi").load(this.basePath).count();
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp3)).toLong() - 1).toString(), cdcDataFrame$default$2()), 100, 0L, 0L);
        long j5 = j3 + 100;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j5, j2, j4);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("004", Predef$.MODULE$.int2Integer(60)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "1").option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset<Row> cdcDataFrame = cdcDataFrame(timestamp3, cdcDataFrame$default$2());
        long count3 = spark().read().format("hudi").load(this.basePath).count();
        long j6 = count3 - count2;
        long j7 = 60 - j6;
        assertCDCOpCnt(cdcDataFrame, j6, j7, 0L);
        long j8 = j5 + j6;
        long j9 = j2 + j7;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j8, j9, j4);
        assertCDCOpCnt(cdcDataFrame(timestamp2, timestamp3), 100, 0L, 20L);
        Dataset json = spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("005", Predef$.MODULE$.int2Integer(70)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        json.write().format("org.apache.hudi").options($plus$plus).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant4 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant4));
        String timestamp4 = hoodieInstant4.getTimestamp();
        long count4 = spark().read().format("hudi").load(this.basePath).count();
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp4)).toLong() - 1).toString(), cdcDataFrame$default$2()), 70L, 0L, count3);
        long j10 = j8 + 70;
        long j11 = j4 + count3;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j10, j9, j11);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("006", Predef$.MODULE$.int2Integer(7)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        long j12 = j10 + 7;
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("007", Predef$.MODULE$.int2Integer(3)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        String timestamp5 = ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        long j13 = j12 + 3;
        json.limit(30).write().format("org.apache.hudi").options($plus$plus).option("hoodie.clean.automatic", "true").option("hoodie.keep.min.commits", "16").option("hoodie.keep.max.commits", "17").option("hoodie.clean.commits.retained", "15").option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        long count5 = spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataFrame2 = cdcDataFrame(timestamp5, cdcDataFrame$default$2());
        long j14 = (count5 - count4) - 10;
        assertCDCOpCnt(cdcDataFrame2, j14, 30 - j14, 0L);
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j13, j9 + 30, j11);
    }

    @ParameterizedTest
    @CsvSource({"COPY_ON_WRITE,data_before_after", "MERGE_ON_READ,data_before_after", "COPY_ON_WRITE,data_before", "MERGE_ON_READ,data_before", "COPY_ON_WRITE,op_key_only", "MERGE_ON_READ,op_key_only"})
    public void testDataSourceWriteWithPartitionField(String str, String str2) {
        Map $plus$plus = commonOpts().$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), "partition"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), str2)})));
        spark().emptyDataFrame();
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Overwrite).save(this.basePath);
        Map map = Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps((Object[]) spark().read().format("hudi").load(this.basePath).groupBy("partition", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect()).map(new TestCDCDataFrameSuite$$anonfun$3(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).toMap(Predef$.MODULE$.$conforms());
        Predef$.MODULE$.assert(map.contains("2016/03/15"));
        Predef$.MODULE$.assert(map.contains("2015/03/16"));
        ((HoodieSparkClientTestHarness) this).metaClient = createMetaClient(spark(), this.basePath);
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant));
        String timestamp = hoodieInstant.getTimestamp();
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), 100L, 0L, 0L);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(30), "2016/03/15"))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant2 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant2));
        String timestamp2 = hoodieInstant2.getTimestamp();
        long unboxToLong = BoxesRunTime.unboxToLong(map.apply("2016/03/15"));
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp2)).toLong() - 1).toString(), cdcDataFrame$default$2()), 30, 0L, unboxToLong);
        long j = 0 + 100 + 30;
        long j2 = 0 + unboxToLong;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j, 0L, j2);
        spark().emptyDataFrame().write().format("org.apache.hudi").options($plus$plus).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_PARTITION_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.PARTITIONS_TO_DELETE().key(), "2015/03/16").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant3 = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse(hasCDCLogFile(hoodieInstant3));
        String timestamp3 = hoodieInstant3.getTimestamp();
        long count = spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataFrame = cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp3)).toLong() - 1).toString(), cdcDataFrame$default$2());
        long unboxToLong2 = BoxesRunTime.unboxToLong(map.apply("2015/03/16"));
        assertCDCOpCnt(cdcDataFrame, 0L, 0L, unboxToLong2);
        long j3 = j2 + unboxToLong2;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j, 0L, j3);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("000", Predef$.MODULE$.int2Integer(50)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        String timestamp4 = ((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        long count2 = spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataFrame2 = cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp4)).toLong() - 1).toString(), cdcDataFrame$default$2());
        long j4 = count2 - count;
        long j5 = 50 - j4;
        assertCDCOpCnt(cdcDataFrame2, j4, j5, 0L);
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), j + j4, 0 + j5, j3);
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp2)).toLong() - 1).toString(), timestamp3), 30, 0L, unboxToLong + unboxToLong2);
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testCDCWithMultiBlocksAndLogFiles(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode2 = HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY;
        Tuple2.mcII.sp spVar = (hoodieCDCSupplementalLoggingMode != null ? !hoodieCDCSupplementalLoggingMode.equals(hoodieCDCSupplementalLoggingMode2) : hoodieCDCSupplementalLoggingMode2 != null) ? new Tuple2.mcII.sp(2048, 5120) : new Tuple2.mcII.sp(256, 1024);
        if (spVar == null) {
            throw new MatchError(spVar);
        }
        Tuple2.mcII.sp spVar2 = new Tuple2.mcII.sp(spVar._1$mcI$sp(), spVar._2$mcI$sp());
        Map $plus$plus = commonOpts().$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), hoodieCDCSupplementalLoggingMode.name()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.logfile.data.block.max.size"), BoxesRunTime.boxToInteger(spVar2._1$mcI$sp()).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.logfile.max.size"), BoxesRunTime.boxToInteger(spVar2._2$mcI$sp()).toString())})));
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Overwrite).save(this.basePath);
        ((HoodieSparkClientTestHarness) this).metaClient = createMetaClient(spark(), this.basePath);
        Schema tableAvroSchema = new TableSchemaResolver(this.metaClient).getTableAvroSchema(false);
        Schema schemaBySupplementalLoggingMode = HoodieCDCUtils.schemaBySupplementalLoggingMode(hoodieCDCSupplementalLoggingMode, tableAvroSchema);
        List<HoodieRecord<?>> generateUniqueUpdates = this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50));
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(generateUniqueUpdates)).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options($plus$plus).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get();
        scala.collection.immutable.List list = (scala.collection.immutable.List) getCDCLogFile(hoodieInstant).flatMap(new TestCDCDataFrameSuite$$anonfun$4(this, schemaBySupplementalLoggingMode), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals(list.size(), 50);
        checkCDCDataForInsertOrUpdate(hoodieCDCSupplementalLoggingMode, schemaBySupplementalLoggingMode, tableAvroSchema, list, generateUniqueUpdates, HoodieCDCOperation.UPDATE);
        String timestamp = hoodieInstant.getTimestamp();
        long count = spark().read().format("hudi").load(this.basePath).count() - 100;
        assertCDCOpCnt(cdcDataFrame(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1).toString(), cdcDataFrame$default$2()), count, 50 - count, 0L);
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testCDCWithAWSDMSPayload(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.name"), "test"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.recordkey.field"), "id"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.precombine.field"), "replicadmstimestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.keygenerator.class"), "org.apache.hudi.keygen.NonpartitionedKeyGenerator"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.partitionpath.field"), ""), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.payload.class"), "org.apache.hudi.common.model.AWSDmsAvroPayload"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.cdc.enabled"), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.cdc.supplemental.logging.mode"), "data_before_after")}));
        Seq apply2 = Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple4[]{new Tuple4("1", "I", "2023-06-14 15:46:06.953746", "A"), new Tuple4("2", "I", "2023-06-14 15:46:07.953746", "B"), new Tuple4("3", "I", "2023-06-14 15:46:08.953746", "C")}));
        StructType apply3 = StructType$.MODULE$.apply(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new StructField[]{new StructField("id", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("Op", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("replicadmstimestamp", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("code", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())})));
        spark().createDataFrame((List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) apply2.map(new TestCDCDataFrameSuite$$anonfun$5(this), Seq$.MODULE$.canBuildFrom())).asJava(), apply3).write().format("org.apache.hudi").option("hoodie.datasource.write.operation", "upsert").options(apply).mode("append").save(this.basePath);
        Assertions.assertEquals(spark().read().format("org.apache.hudi").load(this.basePath).count(), 3L);
        spark().createDataFrame((List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple4[]{new Tuple4("3", "D", "2023-06-14 15:47:09.953746", "B")})).map(new TestCDCDataFrameSuite$$anonfun$6(this), Seq$.MODULE$.canBuildFrom())).asJava(), apply3).write().format("org.apache.hudi").option("hoodie.datasource.write.operation", "upsert").options(apply).mode("append").save(this.basePath);
        Assertions.assertEquals(spark().read().format("org.apache.hudi").load(this.basePath).count(), 2L);
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testCDCCleanRetain(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.cdc.enabled"), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.cdc.supplemental.logging.mode"), hoodieCDCSupplementalLoggingMode.name()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.bulkinsert.shuffle.parallelism"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.delete.shuffle.parallelism"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.recordkey.field"), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.datasource.write.precombine.field"), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.name"), new StringBuilder().append("hoodie_test").append(hoodieCDCSupplementalLoggingMode.name()).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.clean.automatic"), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.clean.commits.retained"), "1")}));
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(apply).mode(SaveMode.Overwrite).save(this.basePath);
        ((HoodieSparkClientTestHarness) this).metaClient = createMetaClient(spark(), this.basePath);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(apply).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        scala.collection.immutable.List<String> cDCLogFile = getCDCLogFile((HoodieInstant) this.metaClient.reloadActiveTimeline().lastInstant().get());
        Assertions.assertTrue(isFilesExistInFileSystem(cDCLogFile));
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(50)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(apply).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        spark().read().json(spark().sparkContext().parallelize(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings(this.dataGen.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(50)))).asScala()).toList(), 2, ClassTag$.MODULE$.apply(String.class))).write().format("org.apache.hudi").options(apply).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        Assertions.assertFalse(isFilesExistInFileSystem(cDCLogFile));
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void testCDCWhenFirstWriteContainsUpsertAndDelete(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        StructType apply = StructType$.MODULE$.apply(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new StructField[]{new StructField("_id", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), new StructField("Op", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), new StructField("replicadmstimestamp", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), new StructField("code", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), new StructField("partition", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4())})));
        SparkContext sparkContext = spark().sparkContext();
        spark().createDataFrame(sparkContext.parallelize(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"1", "I", "2023-06-14 15:46:06.953746", "A", "A"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"1", "U", "2023-06-20 15:46:06.953746", "A", "A"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"2", "I", "2023-06-14 15:46:06.953746", "A", "A"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"2", "D", "2023-06-20 15:46:06.953746", "A", "A"}))})), sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(Row.class)), apply).write().format("hudi").option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).options(QuickstartUtils.getQuickstartWriteConfigs()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY(), "_id").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY(), "replicadmstimestamp").option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY(), "partition").option(HoodieWriteConfig.TBL_NAME.key(), new StringBuilder().append(this.tableName).append(hoodieCDCSupplementalLoggingMode.name()).toString()).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload").option("hoodie.table.cdc.enabled", "true").option("hoodie.table.cdc.supplemental.logging.mode", hoodieCDCSupplementalLoggingMode.name()).mode(SaveMode.Append).save(this.basePath);
        SparkContext sparkContext2 = spark().sparkContext();
        spark().createDataFrame(sparkContext2.parallelize(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"1", "U", "2023-06-14 15:46:06.953746", "A", "A"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"2", "U", "2023-06-20 15:46:06.953746", "A", "A"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"3", "I", "2023-06-20 15:46:06.953746", "A", "A"}))})), sparkContext2.parallelize$default$2(), ClassTag$.MODULE$.apply(Row.class)), apply).write().format("hudi").option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).options(QuickstartUtils.getQuickstartWriteConfigs()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY(), "_id").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY(), "replicadmstimestamp").option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY(), "partition").option(HoodieWriteConfig.TBL_NAME.key(), new StringBuilder().append(this.tableName).append(hoodieCDCSupplementalLoggingMode.name()).toString()).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload").option("hoodie.table.cdc.enabled", "true").option("hoodie.table.cdc.supplemental.logging.mode", hoodieCDCSupplementalLoggingMode.name()).mode(SaveMode.Append).save(this.basePath);
        HoodieTableMetaClient createMetaClient = createMetaClient(spark(), this.basePath);
        String timestamp = ((HoodieInstant) createMetaClient.reloadActiveTimeline().firstInstant().get()).getTimestamp();
        String timestamp2 = ((HoodieInstant) createMetaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset<Row> load = spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", "0").option("hoodie.datasource.read.end.instanttime", timestamp).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        load.show(false);
        assertCDCOpCnt(load, 1L, 0L, 0L);
        Assertions.assertEquals(load.count(), 1L);
        Dataset<Row> load2 = spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", timestamp).option("hoodie.datasource.read.end.instanttime", timestamp2).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        load2.show(false);
        assertCDCOpCnt(load2, 2L, 1L, 0L);
        Assertions.assertEquals(load2.count(), 3L);
        Dataset<Row> load3 = spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", "0").option("hoodie.datasource.read.end.instanttime", timestamp2).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        load3.show(false);
        assertCDCOpCnt(load3, 3L, 1L, 0L);
        Assertions.assertEquals(load3.count(), 4L);
    }
}
