package org.apache.hudi.functional;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: TestStructuredStreaming.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055d\u0001B\b\u0011\u0001eAQ\u0001\t\u0001\u0005\u0002\u0005Bq\u0001\n\u0001C\u0002\u0013%Q\u0005\u0003\u0004-\u0001\u0001\u0006IA\n\u0005\b[\u0001\u0001\r\u0011\"\u0001/\u0011\u001d1\u0004\u00011A\u0005\u0002]Ba\u0001\u0011\u0001!B\u0013y\u0003bB!\u0001\u0005\u0004%\tA\u0011\u0005\u0007'\u0002\u0001\u000b\u0011B\"\t\u000bQ\u0003A\u0011I+\t\u000b\u0005\u0004A\u0011I+\t\u000b\u0019\u0004A\u0011A+\t\u000b-\u0004A\u0011\u00027\t\u0019\u0005\u0005\u0004\u0001%A\u0001\u0002\u0003%\t!a\u0019\t\u0019\u0005\u001d\u0004\u0001%A\u0001\u0002\u0003%\t!!\u001b\u0003/Q+7\u000f^*ueV\u001cG/\u001e:fIN#(/Z1nS:<'BA\t\u0013\u0003)1WO\\2uS>t\u0017\r\u001c\u0006\u0003'Q\tA\u0001[;eS*\u0011QCF\u0001\u0007CB\f7\r[3\u000b\u0003]\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\u000e\u0011\u0005mqR\"\u0001\u000f\u000b\u0005u\u0011\u0012!\u0003;fgR,H/\u001b7t\u0013\tyBD\u0001\u000bI_>$\u0017.Z\"mS\u0016tG\u000fV3ti\n\u000b7/Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\t\u0002\"a\t\u0001\u000e\u0003A\t1\u0001\\8h+\u00051\u0003CA\u0014+\u001b\u0005A#BA\u0015\u0015\u0003\u0015awn\u001a\u001bk\u0013\tY\u0003F\u0001\u0004M_\u001e<WM]\u0001\u0005Y><\u0007%A\u0003ta\u0006\u00148.F\u00010!\t\u0001D'D\u00012\u0015\t\u00114'A\u0002tc2T!!\f\u000b\n\u0005U\n$\u0001D*qCJ\\7+Z:tS>t\u0017!C:qCJ\\w\fJ3r)\tAd\b\u0005\u0002:y5\t!HC\u0001<\u0003\u0015\u00198-\u00197b\u0013\ti$H\u0001\u0003V]&$\bbB \u0006\u0003\u0003\u0005\raL\u0001\u0004q\u0012\n\u0014AB:qCJ\\\u0007%\u0001\u0006d_6lwN\\(qiN,\u0012a\u0011\t\u0005\t&[5*D\u0001F\u0015\t1u)A\u0005j[6,H/\u00192mK*\u0011\u0001JO\u0001\u000bG>dG.Z2uS>t\u0017B\u0001&F\u0005\ri\u0015\r\u001d\t\u0003\u0019Fk\u0011!\u0014\u0006\u0003\u001d>\u000bA\u0001\\1oO*\t\u0001+\u0001\u0003kCZ\f\u0017B\u0001*N\u0005\u0019\u0019FO]5oO\u0006Y1m\\7n_:|\u0005\u000f^:!\u0003\u0015\u0019X\r^+q)\u0005A\u0004FA\u0005X!\tAv,D\u0001Z\u0015\tQ6,A\u0002ba&T!\u0001X/\u0002\u000f),\b/\u001b;fe*\u0011aLF\u0001\u0006UVt\u0017\u000e^\u0005\u0003Af\u0013!BQ3g_J,W)Y2i\u0003!!X-\u0019:E_^t\u0007F\u0001\u0006d!\tAF-\u0003\u0002f3\nI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u0018i\u0016\u001cHo\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001eD#a\u00035\u0011\u0005aK\u0017B\u00016Z\u0005\u0011!Vm\u001d;\u0002/]\f\u0017\u000e\u001e+jY2\fE\u000f\\3bgRt5i\\7nSR\u001cH#C7qs\u0006-\u0011qBA\n!\tId.\u0003\u0002pu\t\u0019\u0011J\u001c;\t\u000bEd\u0001\u0019\u0001:\u0002\u0005\u0019\u001c\bCA:x\u001b\u0005!(BA9v\u0015\t1H#\u0001\u0004iC\u0012|w\u000e]\u0005\u0003qR\u0014!BR5mKNK8\u000f^3n\u0011\u0015QH\u00021\u0001|\u0003%!\u0018M\u00197f!\u0006$\b\u000eE\u0002}\u0003\u000fq1!`A\u0002!\tq((D\u0001��\u0015\r\t\t\u0001G\u0001\u0007yI|w\u000e\u001e \n\u0007\u0005\u0015!(\u0001\u0004Qe\u0016$WMZ\u0005\u0004%\u0006%!bAA\u0003u!1\u0011Q\u0002\u0007A\u00025\f!B\\;n\u0007>lW.\u001b;t\u0011\u0019\t\t\u0002\u0004a\u0001[\u0006YA/[7f_V$8+Z2t\u0011\u0019\t)\u0002\u0004a\u0001[\u0006)2\u000f\\3faN+7m]!gi\u0016\u0014X)Y2i%Vt\u0007&\u0002\u0007\u0002\u001a\u0005E\u0002#B\u001d\u0002\u001c\u0005}\u0011bAA\u000fu\t1A\u000f\u001b:poN\u0004B!!\t\u0002,9!\u00111EA\u0014\u001d\rq\u0018QE\u0005\u0002w%\u0019\u0011\u0011\u0006\u001e\u0002\u000fA\f7m[1hK&!\u0011QFA\u0018\u0005QIe\u000e^3seV\u0004H/\u001a3Fq\u000e,\u0007\u000f^5p]*\u0019\u0011\u0011\u0006\u001e2\ryY\u00181GA0c%\u0019\u0013QGA\u001f\u0003+\ny$\u0006\u0003\u00028\u0005eR#A>\u0005\u000f\u0005m\u0002D1\u0001\u0002F\t\tA+\u0003\u0003\u0002@\u0005\u0005\u0013a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0013GC\u0002\u0002Di\na\u0001\u001e5s_^\u001c\u0018\u0003BA$\u0003\u001b\u00022!OA%\u0013\r\tYE\u000f\u0002\b\u001d>$\b.\u001b8h!\u0011\ty%!\u0015\u000f\u0007e\n9#\u0003\u0003\u0002T\u0005=\"!\u0003+ie><\u0018M\u00197fc%\u0019\u0013qKA-\u00037\n\u0019ED\u0002:\u00033J1!a\u0011;c\u0015\u0011\u0013HOA/\u0005\u0015\u00198-\u00197bc\r1\u0013qD\u0001\u0013aJ|G/Z2uK\u0012$#-Y:f!\u0006$\b\u000eF\u0002L\u0003KBqaP\u0007\u0002\u0002\u0003\u0007!%\u0001\u0007qe>$Xm\u0019;fI\u001227\u000fF\u0002s\u0003WBqa\u0010\b\u0002\u0002\u0003\u0007!\u0005")
/* loaded from: input_file:org/apache/hudi/functional/TestStructuredStreaming.class */
public class TestStructuredStreaming extends HoodieClientTestBase {
    private final Logger log = LogManager.getLogger(getClass());
    private SparkSession spark = null;
    private final Map<String, String> commonOpts = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY()), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY()), "partition"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY()), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.table.name"), "hoodie_test")}));

    public /* synthetic */ String protected$basePath(TestStructuredStreaming testStructuredStreaming) {
        return testStructuredStreaming.basePath;
    }

    public /* synthetic */ FileSystem protected$fs(TestStructuredStreaming testStructuredStreaming) {
        return testStructuredStreaming.fs;
    }

    private Logger log() {
        return this.log;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession sparkSession) {
        this.spark = sparkSession;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @BeforeEach
    public void setUp() {
        initPath();
        initSparkContexts();
        spark_$eq(this.sqlContext.sparkSession());
        initTestDataGenerator();
        initFileSystem();
    }

    @AfterEach
    public void tearDown() {
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
    }

    @Test
    public void testStructuredStreaming() {
        this.fs.delete(new Path(this.basePath), true);
        String sb = new StringBuilder(7).append(this.basePath).append("/source").toString();
        String sb2 = new StringBuilder(5).append(this.basePath).append("/dest").toString();
        this.fs.mkdirs(new Path(sb));
        Dataset json = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        Dataset json2 = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUpdates("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        long count = json2.select("_row_key", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count();
        Dataset json3 = spark().readStream().schema(json.schema()).json(sb);
        Await$.MODULE$.result(Future$.MODULE$.sequence(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Future[]{Future$.MODULE$.apply(() -> {
            Predef$.MODULE$.println("streaming starting");
            json3.writeStream().format("org.apache.hudi").options(this.commonOpts()).trigger(new ProcessingTime(100L)).option("checkpointLocation", new StringBuilder(11).append(this.protected$basePath(this)).append("/checkpoint").toString()).outputMode(OutputMode.Append()).start(sb2).awaitTermination(10000L);
            Predef$.MODULE$.println("streaming ends");
        }, ExecutionContext$Implicits$.MODULE$.global()), Future$.MODULE$.apply(() -> {
            json.coalesce(1).write().mode(SaveMode.Append).json(sb);
            int waitTillAtleastNCommits = this.waitTillAtleastNCommits(this.protected$fs(this), sb2, 1, 120, 5);
            Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.protected$fs(this), sb2, "000"));
            String latestCommit = HoodieDataSourceHelpers.latestCommit(this.protected$fs(this), sb2);
            Predef$.MODULE$.assert(this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(sb2).append("/*/*/*/*").toString()).count() == 100);
            json2.coalesce(1).write().mode(SaveMode.Append).json(sb);
            this.waitTillAtleastNCommits(this.protected$fs(this), sb2, waitTillAtleastNCommits + 1, 120, 5);
            String latestCommit2 = HoodieDataSourceHelpers.latestCommit(this.protected$fs(this), sb2);
            Assertions.assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), sb2, "000").size());
            Assertions.assertEquals(100L, this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(sb2).append("/*/*/*/*").toString()).count());
            String str = (String) HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), sb2, "000").get(0);
            Dataset load = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY(), str).load(sb2);
            Assertions.assertEquals(100L, load.count());
            Row[] rowArr = (Row[]) load.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
            Assertions.assertEquals(1, rowArr.length);
            Assertions.assertEquals(str, rowArr[0].get(0));
            Dataset load2 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY(), latestCommit).load(sb2);
            Assertions.assertEquals(count, load2.count());
            Row[] rowArr2 = (Row[]) load2.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
            Assertions.assertEquals(1, rowArr2.length);
            Assertions.assertEquals(latestCommit2, rowArr2[0].get(0));
        }, ExecutionContext$Implicits$.MODULE$.global())})), Seq$.MODULE$.canBuildFrom(), ExecutionContext$Implicits$.MODULE$.global()), Duration$.MODULE$.Inf());
    }

    private int waitTillAtleastNCommits(FileSystem fileSystem, String str, int i, int i2, int i3) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        int i4 = i2 * 1000;
        int i5 = 0;
        boolean z = false;
        for (long j = currentTimeMillis; !z && j - currentTimeMillis < i4; j = System.currentTimeMillis()) {
            try {
                try {
                    HoodieTimeline allCompletedCommitsCompactions = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fileSystem, str);
                    log().info(new StringBuilder(10).append("Timeline :").append(allCompletedCommitsCompactions.getInstants().toArray()).toString());
                    if (allCompletedCommitsCompactions.countInstants() >= i) {
                        i5 = allCompletedCommitsCompactions.countInstants();
                        z = true;
                    }
                    new HoodieTableMetaClient(fileSystem.getConf(), str, true);
                } catch (TableNotFoundException e) {
                    log().info("Got table not found exception. Retrying");
                }
                Thread.sleep(i3 * 1000);
            } catch (Throwable th) {
                Thread.sleep(i3 * 1000);
                System.currentTimeMillis();
                throw th;
            }
        }
        if (z) {
            return i5;
        }
        throw new IllegalStateException(new StringBuilder(44).append("Timed-out waiting for ").append(i).append(" commits to appear in ").append(str).toString());
    }
}
