package org.apache.hudi.functional.cdc;

import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.QueryTest$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Add;
import org.apache.spark.sql.catalyst.expressions.Add$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.If;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.execution.streaming.MemoryStream$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: TestCDCStreamingSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001y2Aa\u0001\u0003\u0001\u001f!)A\u0003\u0001C\u0001+!)q\u0003\u0001C\u00011\t)B+Z:u\u0007\u0012\u001b5\u000b\u001e:fC6LgnZ*vSR,'BA\u0003\u0007\u0003\r\u0019Gm\u0019\u0006\u0003\u000f!\t!BZ;oGRLwN\\1m\u0015\tI!\"\u0001\u0003ik\u0012L'BA\u0006\r\u0003\u0019\t\u0007/Y2iK*\tQ\"A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001!A\u0011\u0011CE\u0007\u0002\t%\u00111\u0003\u0002\u0002\u0012\u0011>|G-[3D\t\u000e#Vm\u001d;CCN,\u0017A\u0002\u001fj]&$h\bF\u0001\u0017!\t\t\u0002!\u0001\u0007dI\u000e\u001cFO]3b[&tw\r\u0006\u0002\u001a?A\u0011!$H\u0007\u00027)\tA$A\u0003tG\u0006d\u0017-\u0003\u0002\u001f7\t!QK\\5u\u0011\u0015\u0001#\u00011\u0001\"\u0003-awnZ4j]\u001elu\u000eZ3\u0011\u0005\tBS\"A\u0012\u000b\u0005\u0015!#BA\u0013'\u0003\u0015!\u0018M\u00197f\u0015\t9\u0003\"\u0001\u0004d_6lwN\\\u0005\u0003S\r\u0012\u0001\u0005S8pI&,7\tR\"TkB\u0004H.Z7f]R\fG\u000eT8hO&tw-T8eK\"\"!aK\u001c9!\taS'D\u0001.\u0015\tqs&\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t\u0001\u0014'\u0001\u0004qCJ\fWn\u001d\u0006\u0003eM\nqA[;qSR,'O\u0003\u00025\u0019\u0005)!.\u001e8ji&\u0011a'\f\u0002\u000b\u000b:,XnU8ve\u000e,\u0017!\u0002<bYV,7%A\u0011)\u0005\tQ\u0004CA\u001e=\u001b\u0005y\u0013BA\u001f0\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f\u001e")
/* loaded from: input_file:org/apache/hudi/functional/cdc/TestCDCStreamingSuite.class */
public class TestCDCStreamingSuite extends HoodieCDCTestBase {
    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    public void cdcStreaming(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.bulkinsert.shuffle.parallelism"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.delete.shuffle.parallelism"), "1")}));
        SparkSession spark = spark();
        String sb = new StringBuilder(22).append(this.basePath).append("/user_to_country_table").toString();
        String sb2 = new StringBuilder(28).append(this.basePath).append("/country_to_population_table").toString();
        final TestCDCStreamingSuite testCDCStreamingSuite = null;
        spark.implicits().localSeqToDatasetHolder(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToInteger(1), "US", "1000"), new Tuple3(BoxesRunTime.boxToInteger(2), "US", "1000"), new Tuple3(BoxesRunTime.boxToInteger(3), "China", "1000"), new Tuple3(BoxesRunTime.boxToInteger(4), "Singapore", "1000")})), spark.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(TestCDCStreamingSuite.class.getClassLoader()), new TypeCreator(testCDCStreamingSuite) { // from class: org.apache.hudi.functional.cdc.TestCDCStreamingSuite$$typecreator5$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple3"), new $colon.colon(mirror.staticClass("scala.Int").asType().toTypeConstructor(), new $colon.colon(mirror.staticClass("java.lang.String").asType().toTypeConstructor(), new $colon.colon(mirror.staticClass("java.lang.String").asType().toTypeConstructor(), Nil$.MODULE$))));
            }
        }))).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"userid", "country", "ts"})).write().format("hudi").options(apply).option(HoodieTableConfig.CDC_ENABLED.key(), "true").option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), hoodieCDCSupplementalLoggingMode.name()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), "userid").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), "ts").option(HoodieWriteConfig.TBL_NAME.key(), "user_to_country").save(sb);
        final TestCDCStreamingSuite testCDCStreamingSuite2 = null;
        spark.implicits().localSeqToDatasetHolder(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3("US", BoxesRunTime.boxToInteger(200), "1000"), new Tuple3("China", BoxesRunTime.boxToInteger(50), "1000"), new Tuple3("Singapore", BoxesRunTime.boxToInteger(20), "1000")})), spark.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(TestCDCStreamingSuite.class.getClassLoader()), new TypeCreator(testCDCStreamingSuite2) { // from class: org.apache.hudi.functional.cdc.TestCDCStreamingSuite$$typecreator13$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple3"), new $colon.colon(mirror.staticClass("java.lang.String").asType().toTypeConstructor(), new $colon.colon(mirror.staticClass("scala.Int").asType().toTypeConstructor(), new $colon.colon(mirror.staticClass("java.lang.String").asType().toTypeConstructor(), Nil$.MODULE$))));
            }
        }))).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"country", "population", "ts"})).write().format("hudi").options(apply).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), "country").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), "ts").option(HoodieWriteConfig.TBL_NAME.key(), "country_to_population").save(sb2);
        HoodieTableMetaClient createMetaClient = createMetaClient(spark(), sb);
        final TestCDCStreamingSuite testCDCStreamingSuite3 = null;
        MemoryStream memoryStream = new MemoryStream(100, spark().sqlContext(), MemoryStream$.MODULE$.$lessinit$greater$default$3(), spark.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(TestCDCStreamingSuite.class.getClassLoader()), new TypeCreator(testCDCStreamingSuite3) { // from class: org.apache.hudi.functional.cdc.TestCDCStreamingSuite$$typecreator17$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple3"), new $colon.colon(mirror.staticClass("scala.Int").asType().toTypeConstructor(), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), Nil$.MODULE$))));
            }
        })));
        StreamingQuery start = memoryStream.toDS().toDF(Predef$.MODULE$.wrapRefArray(new String[]{"userid", "country", "ts"})).writeStream().format("hudi").foreachBatch((dataset, obj) -> {
            $anonfun$cdcStreaming$1(apply, sb, dataset, BoxesRunTime.unboxToLong(obj));
            return BoxedUnit.UNIT;
        }).start();
        Expression expr = functions$.MODULE$.typedLit(BoxesRunTime.boxToInteger(-1), package$.MODULE$.universe().TypeTag().Int()).expr();
        Expression expr2 = functions$.MODULE$.typedLit(BoxesRunTime.boxToInteger(1), package$.MODULE$.universe().TypeTag().Int()).expr();
        Expression expr3 = functions$.MODULE$.typedLit(BoxesRunTime.boxToInteger(0), package$.MODULE$.universe().TypeTag().Int()).expr();
        If r0 = new If(functions$.MODULE$.isnull(functions$.MODULE$.col("bcountry")).expr(), expr3, expr);
        If r02 = new If(functions$.MODULE$.isnull(functions$.MODULE$.col("acountry")).expr(), expr3, expr2);
        StreamingQuery start2 = spark().readStream().format("hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key(), DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL()).load(sb).writeStream().format("hudi").foreachBatch((dataset2, obj2) -> {
            $anonfun$cdcStreaming$2(this, sb2, r0, r02, apply, dataset2, BoxesRunTime.unboxToLong(obj2));
            return BoxedUnit.UNIT;
        }).start();
        memoryStream.addData(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToInteger(3), "US", "1100"), new Tuple3(BoxesRunTime.boxToInteger(4), "US", "1100"), new Tuple3(BoxesRunTime.boxToInteger(5), "US", "1100")})));
        start.processAllAvailable();
        start2.processAllAvailable();
        Predef$.MODULE$.assert(spark().read().format("hudi").load(sb).where("country = 'US'").count() == 5);
        Dataset<Row> cdcDataFrame = cdcDataFrame(sb, BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) createMetaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1).toString(), null);
        cdcDataFrame.show(false);
        assertCDCOpCnt(cdcDataFrame, 1L, 2L, 0L);
        QueryTest$.MODULE$.checkAnswer(spark().read().format("hudi").load(sb2).select("country", Predef$.MODULE$.wrapRefArray(new String[]{"population"})).sort("country", Predef$.MODULE$.wrapRefArray(new String[0])), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"China", BoxesRunTime.boxToInteger(50)})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"Singapore", BoxesRunTime.boxToInteger(20)})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"US", BoxesRunTime.boxToInteger(205)}))})), QueryTest$.MODULE$.checkAnswer$default$3());
        memoryStream.addData(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToInteger(3), "Singapore", "1200"), new Tuple3(BoxesRunTime.boxToInteger(7), "Canada", "1200"), new Tuple3(BoxesRunTime.boxToInteger(8), "Singapore", "1200")})));
        start.processAllAvailable();
        start2.processAllAvailable();
        Dataset<Row> cdcDataFrame2 = cdcDataFrame(sb, BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) createMetaClient.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1).toString(), null);
        cdcDataFrame2.show(false);
        assertCDCOpCnt(cdcDataFrame2, 2L, 1L, 0L);
        QueryTest$.MODULE$.checkAnswer(spark().read().format("hudi").load(sb2).select("country", Predef$.MODULE$.wrapRefArray(new String[]{"population"})).sort("country", Predef$.MODULE$.wrapRefArray(new String[0])), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"Canada", BoxesRunTime.boxToInteger(1)})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"China", BoxesRunTime.boxToInteger(50)})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"Singapore", BoxesRunTime.boxToInteger(22)})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"US", BoxesRunTime.boxToInteger(204)}))})), QueryTest$.MODULE$.checkAnswer$default$3());
        start.stop();
        start2.stop();
    }

    public static final /* synthetic */ void $anonfun$cdcStreaming$1(Map map, String str, Dataset dataset, long j) {
        dataset.write().format("hudi").options(map).option(HoodieTableConfig.CDC_ENABLED.key(), "true").option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), "userid").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), "ts").option(HoodieWriteConfig.TBL_NAME.key(), "user_to_country").mode(SaveMode.Append).save(str);
    }

    public static final /* synthetic */ void $anonfun$cdcStreaming$2(TestCDCStreamingSuite testCDCStreamingSuite, String str, If r21, If r22, Map map, Dataset dataset, long j) {
        dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.get_json_object(functions$.MODULE$.col("before"), "$.country").as("bcountry"), functions$.MODULE$.get_json_object(functions$.MODULE$.col("after"), "$.country").as("acountry"), functions$.MODULE$.get_json_object(functions$.MODULE$.col("after"), "$.ts").as("ts")})).withColumn("bcnt", new Column(r21)).withColumn("acnt", new Column(r22)).select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.explode(functions$.MODULE$.array(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("bcountry").as("country"), functions$.MODULE$.col("bcnt").as("cnt"), functions$.MODULE$.col("ts")})), functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("acountry").as("country"), functions$.MODULE$.col("acnt").as("cnt"), functions$.MODULE$.col("ts")}))})))})).select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("col.country").as("country"), functions$.MODULE$.col("col.cnt").as("cnt"), functions$.MODULE$.col("col.ts").as("ts")})).where("country is not null").groupBy("country", Predef$.MODULE$.wrapRefArray(new String[0])).agg(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cnt"), "sum"), Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("ts"), "max")})).join(testCDCStreamingSuite.spark().read().format("hudi").load(str), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"country"})), "left").select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("country"), new Column(new Add(functions$.MODULE$.col("sum(cnt)").expr(), new If(functions$.MODULE$.isnull(functions$.MODULE$.col("population")).expr(), Literal$.MODULE$.apply(BoxesRunTime.boxToInteger(0)), functions$.MODULE$.col("population").expr()), Add$.MODULE$.apply$default$3())).as("population"), functions$.MODULE$.col("max(ts)").as("ts")})).write().format("hudi").options(map).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), "country").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), "ts").option(HoodieWriteConfig.TBL_NAME.key(), "country_to_population").mode(SaveMode.Append).save(str);
    }
}
