package org.apache.hudi.integ.testsuite.streaming;

import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.Trigger;
import scala.Predef$;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;

/* compiled from: StructuredStreamingSinkTestWriter.scala */
/* loaded from: input_file:org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter$.class */
public final class StructuredStreamingSinkTestWriter$ {
    public static StructuredStreamingSinkTestWriter$ MODULE$;
    private final Logger org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log;
    private boolean validationComplete;

    static {
        new StructuredStreamingSinkTestWriter$();
    }

    public Logger org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log() {
        return this.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log;
    }

    public boolean validationComplete() {
        return this.validationComplete;
    }

    public void validationComplete_$eq(boolean z) {
        this.validationComplete = z;
    }

    public void waitUntilCondition() {
        waitUntilCondition(300000L, 500L);
    }

    public void waitUntilCondition(long j, long j2) {
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= j || validationComplete()) {
                return;
            }
            org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info(new StringBuilder(30).append("Waiting for ").append(j2).append(". Total wait time ").append(j4).toString());
            Thread.sleep(j2);
            j3 = j4 + j2;
        }
    }

    public void triggerStreaming(SparkSession sparkSession, String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8) {
        sparkSession.streams().addListener(shutdownListener$1(sparkSession, sparkSession, str2, str3, str6, str7));
        org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info("Starting to consume from source and writing to hudi ");
        sparkSession.readStream().option("spark.sql.streaming.schemaInference", "true").option("maxFilesPerTrigger", "1").schema(sparkSession.read().format("parquet").load(str2).schema()).parquet(str2).writeStream().format("org.apache.hudi").option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), str).option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key(), str8).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key(), str7).option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key(), str6).option(HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key(), false).option(DataSourceWriteOptions$.MODULE$.STREAMING_IGNORE_FAILED_BATCH().key(), false).option(DataSourceWriteOptions$.MODULE$.STREAMING_RETRY_CNT().key(), 0L).option("hoodie.table.name", str5).option("hoodie.compact.inline.max.delta.commits", "2").option("checkpointLocation", str4).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(30000L)).start(str3);
    }

    public final void org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$validate$1(SparkSession sparkSession, String str, String str2, String str3, String str4) {
        org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info("Validation starting");
        Dataset load = sparkSession.read().format("parquet").load(str);
        Dataset load2 = sparkSession.read().format("hudi").load(str2);
        load.registerTempTable("inputTbl");
        load2.registerTempTable("hudiTbl");
        Predef$.MODULE$.assert(sparkSession.sql(new StringBuilder(39).append("select count(distinct ").append(str3).append(", ").append(str4).append(") from inputTbl").toString()).count() == sparkSession.sql(new StringBuilder(38).append("select count(distinct ").append(str3).append(", ").append(str4).append(") from hudiTbl").toString()).count());
        validationComplete_$eq(true);
        org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info("Validation complete");
    }

    private static final StreamingQueryListener shutdownListener$1(final SparkSession sparkSession, final SparkSession sparkSession2, final String str, final String str2, final String str3, final String str4) {
        return new StreamingQueryListener(sparkSession, sparkSession2, str, str2, str3, str4) { // from class: org.apache.hudi.integ.testsuite.streaming.StructuredStreamingSinkTestWriter$$anon$1
            private final SparkSession spark$2;
            private final SparkSession spark$1;
            private final String inputPath$1;
            private final String hudiPath$1;
            private final String partitionPathField$1;
            private final String recordKeyField$1;

            public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
                StructuredStreamingSinkTestWriter$.MODULE$.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info(new StringBuilder(15).append("Query started: ").append(queryStartedEvent.id()).toString());
            }

            public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
                StructuredStreamingSinkTestWriter$.MODULE$.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info(new StringBuilder(45).append("Query terminated! ").append(queryTerminatedEvent.id()).append(". Validating input and hudi").toString());
                StructuredStreamingSinkTestWriter$.MODULE$.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$validate$1(this.spark$1, this.inputPath$1, this.hudiPath$1, this.partitionPathField$1, this.recordKeyField$1);
                StructuredStreamingSinkTestWriter$.MODULE$.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info("Data Validation complete");
            }

            public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
                if (queryProgressEvent.progress().numInputRows() == 0) {
                    StructuredStreamingSinkTestWriter$.MODULE$.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log().info("Stopping spark structured streaming as we have reached the end");
                    new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.spark$2.streams().active())).foreach(streamingQuery -> {
                        streamingQuery.stop();
                        return BoxedUnit.UNIT;
                    });
                }
            }

            {
                this.spark$2 = sparkSession;
                this.spark$1 = sparkSession2;
                this.inputPath$1 = str;
                this.hudiPath$1 = str2;
                this.partitionPathField$1 = str3;
                this.recordKeyField$1 = str4;
            }
        };
    }

    private StructuredStreamingSinkTestWriter$() {
        MODULE$ = this;
        this.org$apache$hudi$integ$testsuite$streaming$StructuredStreamingSinkTestWriter$$log = LogManager.getLogger(getClass());
        this.validationComplete = false;
    }
}
