/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.python;

import java.io.File;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import org.apache.spark.SparkException;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.SparkThrowable;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.IntegratedUDFTestUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2;
import org.apache.spark.sql.execution.datasources.v2.python.PythonMicroBatchStream;
import org.apache.spark.sql.execution.datasources.v2.python.PythonStreamingSourceOffset;
import org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource;
import org.apache.spark.sql.execution.python.PythonDataSourceSuiteBase;
import org.apache.spark.sql.execution.streaming.CommitLog;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.execution.streaming.MemoryStream$;
import org.apache.spark.sql.execution.streaming.OffsetSeqLog;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnce;
import scala.collection.StringOps$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005%3Aa\u0003\u0007\u00013!)a\u0004\u0001C\u0001?!9\u0011\u0005\u0001b\u0001\n\u0003\u0011\u0003BB\u0017\u0001A\u0003%1\u0005C\u0003/\u0001\u0011Eq\u0006C\u0003<\u0001\u0011Eq\u0006C\u0003=\u0001\u0011Eq\u0006C\u0003>\u0001\u0011Eq\u0006C\u0003?\u0001\u0011Eq\u0006C\u0004@\u0001\t\u0007I\u0011\u0002!\t\r!\u0003\u0001\u0015!\u0003B\u0005y\u0001\u0016\u0010\u001e5p]N#(/Z1nS:<G)\u0019;b'>,(oY3Tk&$XM\u0003\u0002\u000e\u001d\u00051\u0001/\u001f;i_:T!a\u0004\t\u0002\u0013\u0015DXmY;uS>t'BA\t\u0013\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003'Q\tQa\u001d9be.T!!\u0006\f\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0012aA8sO\u000e\u00011C\u0001\u0001\u001b!\tYB$D\u0001\r\u0013\tiBBA\rQsRDwN\u001c#bi\u0006\u001cv.\u001e:dKN+\u0018\u000e^3CCN,\u0017A\u0002\u001fj]&$h\bF\u0001!!\tY\u0002!A\u0006xC&$H+[7f_V$X#A\u0012\u0011\u0005\u0011ZS\"A\u0013\u000b\u0005\u0019:\u0013\u0001\u00033ve\u0006$\u0018n\u001c8\u000b\u0005!J\u0013AC2p]\u000e,(O]3oi*\t!&A\u0003tG\u0006d\u0017-\u0003\u0002-K\tqa)\u001b8ji\u0016$UO]1uS>t\u0017\u0001D<bSR$\u0016.\\3pkR\u0004\u0013A\u0007;fgR$\u0015\r^1TiJ,\u0017-\u001c*fC\u0012,'oU2sSB$X#\u0001\u0019\u0011\u0005EBdB\u0001\u001a7!\t\u0019\u0014&D\u00015\u0015\t)\u0004$\u0001\u0004=e>|GOP\u0005\u0003o%\na\u0001\u0015:fI\u00164\u0017BA\u001d;\u0005\u0019\u0019FO]5oO*\u0011q'K\u0001\u001dg&l\u0007\u000f\\3ECR\f7\u000b\u001e:fC6\u0014V-\u00193feN\u001b'/\u001b9u\u0003)\u001a\u0018.\u001c9mK\u0012\u000bG/Y*ue\u0016\fWNU3bI\u0016\u0014x+\u001b;i\u000b6\u0004H/\u001f\"bi\u000eD7k\u0019:jaR\f1$\u001a:s_J$\u0015\r^1TiJ,\u0017-\u001c*fC\u0012,'oU2sSB$\u0018\u0001H:j[BdW\rR1uCN#(/Z1n/JLG/\u001a:TGJL\u0007\u000f^\u0001\u0014KJ\u0014xN\u001d#bi\u0006\u001cv.\u001e:dK:\u000bW.Z\u000b\u0002\u0003B\u0011!iR\u0007\u0002\u0007*\u0011A)R\u0001\u0005Y\u0006twMC\u0001G\u0003\u0011Q\u0017M^1\n\u0005e\u001a\u0015\u0001F3se>\u0014H)\u0019;b'>,(oY3OC6,\u0007\u0005")
public class PythonStreamingDataSourceSuite
extends PythonDataSourceSuiteBase {
    private final FiniteDuration waitTimeout = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(15)).seconds();
    private final String errorDataSourceName;

    public FiniteDuration waitTimeout() {
        return this.waitTimeout;
    }

    public String testDataStreamReaderScript() {
        return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n      |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition\n      |\n      |class TestDataStreamReader(DataSourceStreamReader):\n      |    current = 0\n      |    def initialOffset(self):\n      |        return {\"offset\": {\"partition-1\": 0}}\n      |    def latestOffset(self):\n      |        self.current += 2\n      |        return {\"offset\": {\"partition-1\": self.current}}\n      |    def partitions(self, start: dict, end: dict):\n      |        start_index = start[\"offset\"][\"partition-1\"]\n      |        end_index = end[\"offset\"][\"partition-1\"]\n      |        return [InputPartition(i) for i in range(start_index, end_index)]\n      |    def commit(self, end: dict):\n      |        1 + 2\n      |    def read(self, partition):\n      |        yield (partition.value,)\n      |"));
    }

    public String simpleDataStreamReaderScript() {
        return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n      |from pyspark.sql.datasource import SimpleDataSourceStreamReader\n      |\n      |class SimpleDataStreamReader(SimpleDataSourceStreamReader):\n      |    def initialOffset(self):\n      |        return {\"partition-1\": 0}\n      |    def read(self, start: dict):\n      |        start_idx = start[\"partition-1\"]\n      |        it = iter([(i, ) for i in range(start_idx, start_idx + 2)])\n      |        return (it, {\"partition-1\": start_idx + 2})\n      |    def readBetweenOffsets(self, start: dict, end: dict):\n      |        start_idx = start[\"partition-1\"]\n      |        end_idx = end[\"partition-1\"]\n      |        return iter([(i, ) for i in range(start_idx, end_idx)])\n      |"));
    }

    public String simpleDataStreamReaderWithEmptyBatchScript() {
        return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n      |from pyspark.sql.datasource import SimpleDataSourceStreamReader\n      |\n      |class SimpleDataStreamReader(SimpleDataSourceStreamReader):\n      |    def initialOffset(self):\n      |        return {\"partition-1\": 0}\n      |    def read(self, start: dict):\n      |        start_idx = start[\"partition-1\"]\n      |        if start_idx % 4 == 0:\n      |            it = iter([(i, ) for i in range(start_idx, start_idx + 2)])\n      |        else:\n      |            it = iter([])\n      |        return (it, {\"partition-1\": start_idx + 2})\n      |    def readBetweenOffsets(self, start: dict, end: dict):\n      |        start_idx = start[\"partition-1\"]\n      |        end_idx = end[\"partition-1\"]\n      |        return iter([(i, ) for i in range(start_idx, end_idx)])\n      |"));
    }

    public String errorDataStreamReaderScript() {
        return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n      |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition\n      |\n      |class ErrorDataStreamReader(DataSourceStreamReader):\n      |    def initialOffset(self):\n      |        raise Exception(\"error reading initial offset\")\n      |    def latestOffset(self):\n      |        raise Exception(\"error reading latest offset\")\n      |    def partitions(self, start: dict, end: dict):\n      |        raise Exception(\"error planning partitions\")\n      |    def commit(self, end: dict):\n      |        raise Exception(\"error committing offset\")\n      |    def read(self, partition):\n      |        yield (0, partition.value)\n      |        yield (1, partition.value)\n      |        yield (2, partition.value)\n      |"));
    }

    public String simpleDataStreamWriterScript() {
        return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n       |import json\n       |import uuid\n       |import os\n       |from pyspark import TaskContext\n       |from pyspark.sql.datasource import DataSource, DataSourceStreamWriter\n       |from pyspark.sql.datasource import WriterCommitMessage\n       |\n       |class SimpleDataSourceStreamWriter(DataSourceStreamWriter):\n       |    def __init__(self, options, overwrite):\n       |        self.options = options\n       |        self.overwrite = overwrite\n       |\n       |    def write(self, iterator):\n       |        context = TaskContext.get()\n       |        partition_id = context.partitionId()\n       |        path = self.options.get(\"path\")\n       |        assert path is not None\n       |        output_path = os.path.join(path, f\"{partition_id}.json\")\n       |        cnt = 0\n       |        mode = \"w\" if self.overwrite else \"a\"\n       |        with open(output_path, mode) as file:\n       |            for row in iterator:\n       |                file.write(json.dumps(row.asDict()) + \"\\n\")\n       |        return WriterCommitMessage()\n       |\n       |class SimpleDataSource(DataSource):\n       |    def schema(self) -> str:\n       |        return \"id INT\"\n       |    def streamWriter(self, schema, overwrite):\n       |        return SimpleDataSourceStreamWriter(self.options, overwrite)\n       |"));
    }

    private String errorDataSourceName() {
        return this.errorDataSourceName;
    }

    public static final /* synthetic */ void $anonfun$new$5(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal1$1, Dataset df, long batchId) {
        df.cache();
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
        if (batchId == 10L) {
            stopSignal1$1.countDown();
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$7(StreamingQueryProgress x$1) {
        return x$1.numInputRows() == 2L;
    }

    public static final /* synthetic */ void $anonfun$new$8(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal2$1, Dataset df, long batchId) {
        df.cache();
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
        if (batchId == 20L) {
            stopSignal2$1.countDown();
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$10(StreamingQueryProgress x$2) {
        return x$2.numInputRows() == 2L;
    }

    public static final /* synthetic */ void $anonfun$new$4(PythonStreamingDataSourceSuite $this, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        Dataset df2 = $this.spark().readStream().format($this.dataSourceName()).load();
        CountDownLatch stopSignal1 = new CountDownLatch(1);
        StreamingQuery q1 = df2.writeStream().option("checkpointLocation", checkpointDir.getAbsolutePath()).foreachBatch((Function2 & Serializable)(df, batchId) -> {
            PythonStreamingDataSourceSuite.$anonfun$new$5($this, stopSignal1, df, BoxesRunTime.unboxToLong((Object)batchId));
            return BoxedUnit.UNIT;
        }).start();
        stopSignal1.await();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(ArrayOps$.MODULE$.forall$extension(Predef$.MODULE$.refArrayOps((Object[])q1.recentProgress()), (Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)PythonStreamingDataSourceSuite.$anonfun$new$7(x$1))), "scala.Predef.refArrayOps[org.apache.spark.sql.streaming.StreamingQueryProgress](q1.recentProgress).forall(((x$1: org.apache.spark.sql.streaming.StreamingQueryProgress) => x$1.numInputRows.==(2)))", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 217));
        q1.stop();
        q1.awaitTermination();
        CountDownLatch stopSignal2 = new CountDownLatch(1);
        StreamingQuery q2 = df2.writeStream().option("checkpointLocation", checkpointDir.getAbsolutePath()).foreachBatch((Function2 & Serializable)(df, batchId) -> {
            PythonStreamingDataSourceSuite.$anonfun$new$8($this, stopSignal2, df, BoxesRunTime.unboxToLong((Object)batchId));
            return BoxedUnit.UNIT;
        }).start();
        stopSignal2.await();
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(ArrayOps$.MODULE$.forall$extension(Predef$.MODULE$.refArrayOps((Object[])q2.recentProgress()), (Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)PythonStreamingDataSourceSuite.$anonfun$new$10(x$2))), "scala.Predef.refArrayOps[org.apache.spark.sql.streaming.StreamingQueryProgress](q2.recentProgress).forall(((x$2: org.apache.spark.sql.streaming.StreamingQueryProgress) => x$2.numInputRows.==(2)))", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 231));
        q2.stop();
        q2.awaitTermination();
    }

    public static final /* synthetic */ void $anonfun$new$13(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal$1, Dataset df, long batchId) {
        df.cache();
        if (batchId % 2L == 0L) {
            $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
            v0 = BoxedUnit.UNIT;
        } else {
            Dataset $org_scalatest_assert_macro_left = df;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
            v0 = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 270));
        }
        if (batchId == 10L) {
            stopSignal$1.countDown();
            return;
        }
    }

    public static final /* synthetic */ void $anonfun$new$12(PythonStreamingDataSourceSuite $this, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        Dataset df2 = $this.spark().readStream().format($this.dataSourceName()).load();
        CountDownLatch stopSignal = new CountDownLatch(1);
        StreamingQuery q = df2.writeStream().option("checkpointLocation", checkpointDir.getAbsolutePath()).foreachBatch((Function2 & Serializable)(df, batchId) -> {
            PythonStreamingDataSourceSuite.$anonfun$new$13($this, stopSignal, df, BoxesRunTime.unboxToLong((Object)batchId));
            return BoxedUnit.UNIT;
        }).start();
        stopSignal.await();
        q.stop();
        q.awaitTermination();
    }

    public static final /* synthetic */ void $anonfun$new$16(PythonStreamingDataSourceSuite $this, File dir) {
        Bool bool;
        long rowCount;
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        File outputDir = new File(path, "output");
        Dataset df = $this.spark().readStream().format($this.dataSourceName()).load();
        IntRef lastBatchId = IntRef.create((int)0);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
            if (i % 2 == 0) {
                OffsetSeqLog offsetLog = new OffsetSeqLog($this.spark(), new File(checkpointDir, "offsets").getCanonicalPath());
                CommitLog commitLog = new CommitLog($this.spark(), new File(checkpointDir, "commits").getCanonicalPath());
                commitLog.purgeAfter(((Tuple2)offsetLog.getLatest().get())._1$mcJ$sp() - 1L);
            }
            StreamingQuery q = df.writeStream().option("checkpointLocation", checkpointDir.getAbsolutePath()).format("json").start(outputDir.getAbsolutePath());
            while (q.recentProgress().length < 5) {
                Thread.sleep(200L);
            }
            q.stop();
            q.awaitTermination();
            lastBatchId$1.elem = (int)q.lastProgress().batchId();
        });
        int $org_scalatest_assert_macro_left = lastBatchId.elem;
        int $org_scalatest_assert_macro_right = 20;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 328));
        long $org_scalatest_assert_macro_left2 = rowCount = $this.spark().read().format("json").load(outputDir.getAbsolutePath()).count();
        int $org_scalatest_assert_macro_right2 = 2 * (lastBatchId.elem + 1);
        Bool $org_scalatest_assert_macro_left3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left2), "==", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 == (long)$org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
        if ($org_scalatest_assert_macro_left3.value()) {
            bool = Bool$.MODULE$.simpleMacroBool(true, "", Prettifier$.MODULE$.default());
        } else {
            long $org_scalatest_assert_macro_left4 = rowCount;
            int $org_scalatest_assert_macro_right3 = 2 * (lastBatchId.elem + 2);
            bool = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left4), "==", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left4 == (long)$org_scalatest_assert_macro_right3, Prettifier$.MODULE$.default());
        }
        Bool $org_scalatest_assert_macro_right4 = bool;
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "||", (Object)$org_scalatest_assert_macro_right4, $org_scalatest_assert_macro_left3.$bar$bar((Function0 & Serializable)() -> $org_scalatest_assert_macro_right4), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 332));
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.spark().read().format("json").load(outputDir.getAbsolutePath()), (Seq<Row>)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)rowCount).map((Function1 & Serializable)x$3 -> Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)x$3))}))));
    }

    private final void testMicroBatchStreamError$1(String action, String msg, Function1 func, PythonDataSourceV2 pythonDs$1, StructType inputSchema$1) {
        PythonMicroBatchStream stream = new PythonMicroBatchStream(pythonDs$1, this.errorDataSourceName(), inputSchema$1, CaseInsensitiveStringMap.empty());
        SparkException err = (SparkException)((Object)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> func.apply((Object)stream), ClassTag$.MODULE$.apply(SparkException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 363)));
        this.checkErrorMatchPVals((SparkThrowable)err, "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"action"), (Object)action), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"msg"), (Object)"(.|\\n)*")})));
        String $org_scalatest_assert_macro_left = err.getMessage();
        String $org_scalatest_assert_macro_right = msg;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 372));
        String $org_scalatest_assert_macro_left2 = err.getMessage();
        String $org_scalatest_assert_macro_right2 = "ErrorDataSource";
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 373));
        stream.stop();
    }

    private final void testMicroBatchStreamError$2(String action, String msg, Function1 func, PythonDataSourceV2 pythonDs$2, StructType inputSchema$2) {
        PythonMicroBatchStream stream = new PythonMicroBatchStream(pythonDs$2, this.errorDataSourceName(), inputSchema$2, CaseInsensitiveStringMap.empty());
        SparkException err = (SparkException)((Object)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> func.apply((Object)stream), ClassTag$.MODULE$.apply(SparkException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 419)));
        this.checkErrorMatchPVals((SparkThrowable)err, "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"action"), (Object)action), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"msg"), (Object)"(.|\\n)*")})));
        String $org_scalatest_assert_macro_left = err.getMessage();
        String $org_scalatest_assert_macro_right = msg;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 428));
        String $org_scalatest_assert_macro_left2 = err.getMessage();
        String $org_scalatest_assert_macro_right2 = "ErrorDataSource";
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 429));
        stream.stop();
    }

    public static final /* synthetic */ void $anonfun$new$29(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal$2, Dataset df, long batchId) {
        df.cache();
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
        if (batchId > 30L) {
            stopSignal$2.countDown();
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$31(StreamingQueryProgress x$4) {
        return x$4.numInputRows() == 2L;
    }

    public static final /* synthetic */ void $anonfun$new$33(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal$3, Dataset df, long batchId) {
        df.cache();
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
        if (batchId >= 2L) {
            stopSignal$3.countDown();
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$35(StreamingQueryProgress x$5) {
        return x$5.numInputRows() == 2L;
    }

    public static final /* synthetic */ void $anonfun$new$37(PythonStreamingDataSourceSuite $this, CountDownLatch stopSignal$4, Dataset df, long batchId) {
        df.cache();
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> df, (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L))})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)(batchId * 2L + 1L))})), (List)Nil$.MODULE$)));
        if (batchId > 30L) {
            stopSignal$4.countDown();
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$39(StreamingQueryProgress x$6) {
        return x$6.numInputRows() == 2L;
    }

    private final void testMicroBatchStreamError$3(String action, String msg, Function1 func, PythonDataSourceV2 pythonDs$3, StructType inputSchema$3) {
        PythonMicroBatchStream stream = new PythonMicroBatchStream(pythonDs$3, this.errorDataSourceName(), inputSchema$3, CaseInsensitiveStringMap.empty());
        SparkException err = (SparkException)((Object)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> func.apply((Object)stream), ClassTag$.MODULE$.apply(SparkException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 659)));
        this.checkErrorMatchPVals((SparkThrowable)err, "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"action"), (Object)action), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"msg"), (Object)"(.|\\n)*")})));
        String $org_scalatest_assert_macro_left = err.getMessage();
        String $org_scalatest_assert_macro_right = msg;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 668));
        String $org_scalatest_assert_macro_left2 = err.getMessage();
        String $org_scalatest_assert_macro_right2 = "ErrorDataSource";
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 669));
        stream.stop();
    }

    private final void testMicroBatchStreamError$4(String action, String msg, Function1 func, PythonDataSourceV2 pythonDs$4, StructType inputSchema$4) {
        PythonMicroBatchStream stream = new PythonMicroBatchStream(pythonDs$4, this.errorDataSourceName(), inputSchema$4, CaseInsensitiveStringMap.empty());
        SparkException err = (SparkException)((Object)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> func.apply((Object)stream), ClassTag$.MODULE$.apply(SparkException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 713)));
        this.checkErrorMatchPVals((SparkThrowable)err, "PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"action"), (Object)action), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"msg"), (Object)"(.|\\n)*")})));
        String $org_scalatest_assert_macro_left = err.getMessage();
        String $org_scalatest_assert_macro_right = msg;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 722));
        String $org_scalatest_assert_macro_left2 = err.getMessage();
        String $org_scalatest_assert_macro_right2 = "ErrorDataSource";
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 723));
        stream.stop();
    }

    private final Dataset resultDf$1(File outputDir$2) {
        return this.spark().read().format("json").load(outputDir$2.getAbsolutePath());
    }

    public static final /* synthetic */ void $anonfun$new$58(PythonStreamingDataSourceSuite $this, String mode$1, MemoryStream inputData$1, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        checkpointDir.mkdir();
        File outputDir = new File(path, "output");
        outputDir.mkdir();
        String string = mode$1;
        String string2 = "append";
        Dataset streamDF = !(string != null ? !string.equals(string2) : string2 != null) ? inputData$1.toDF() : inputData$1.toDF().groupBy("value", (Seq)Nil$.MODULE$).count();
        StreamingQuery q = streamDF.writeStream().format($this.dataSourceName()).outputMode(mode$1).option("checkpointLocation", checkpointDir.getAbsolutePath()).start(outputDir.getAbsolutePath());
        inputData$1.addData((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3}));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> {
            String string = mode$1;
            String string2 = "append";
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$1(outputDir), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)2)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3)})), (List)Nil$.MODULE$))));
                return;
            }
            $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$1(outputDir).select("value", (Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"count"})), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)1), BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)2), BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)1)})), (List)Nil$.MODULE$))));
        }, $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 774));
        inputData$1.addData((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 4}));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> {
            String string = mode$1;
            String string2 = "append";
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$1(outputDir), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)2)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)4)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)1)})), (List)Nil$.MODULE$))))));
                return;
            }
            $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$1(outputDir).select("value", (Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"count"})), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)1), BoxesRunTime.boxToInteger((int)2)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)2), BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)1)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)4), BoxesRunTime.boxToInteger((int)1)})), (List)Nil$.MODULE$)))));
        }, $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 787));
        q.stop();
        q.awaitTermination();
        Option $org_scalatest_assert_macro_left = q.exception();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 801));
    }

    private final Dataset resultDf$2(File outputDir$3) {
        return this.spark().read().format("json").load(outputDir$3.getAbsolutePath());
    }

    public static final /* synthetic */ void $anonfun$new$66(PythonStreamingDataSourceSuite $this, Dataset df$9, MemoryStream inputData$2, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        checkpointDir.mkdir();
        File outputDir = new File(path, "output");
        outputDir.mkdir();
        StreamingQuery q = df$9.writeStream().format($this.dataSourceName()).option("checkpointLocation", checkpointDir.getAbsolutePath()).trigger((Trigger)new ProcessingTimeTrigger(20000L)).start(outputDir.getAbsolutePath());
        inputData$2.addData((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 3));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout().$times(5L))), (Function0 & Serializable)() -> {
            long $org_scalatest_assert_macro_left = q.lastProgress().batchId();
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 831));
        }, $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 830));
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$2(outputDir), (Seq<Row>)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 3).map((Function1 & Serializable)x$8 -> Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)x$8))}))));
        inputData$2.addData((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(4), 6));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout().$times(5L))), (Function0 & Serializable)() -> {
            long $org_scalatest_assert_macro_left = q.lastProgress().batchId();
            int $org_scalatest_assert_macro_right = 2;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 837));
        }, $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 836));
        $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.resultDf$2(outputDir), (Seq<Row>)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 6).map((Function1 & Serializable)x$9 -> Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)x$9))}))));
        q.stop();
        q.awaitTermination();
        Option $org_scalatest_assert_macro_left = q.exception();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 842));
    }

    private final Dataset metadataDf$1(File outputDir$4) {
        return this.spark().read().format("json").load(outputDir$4.getAbsolutePath());
    }

    public static final /* synthetic */ void $anonfun$new$74(PythonStreamingDataSourceSuite $this, MemoryStream inputData$3, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        checkpointDir.mkdir();
        File outputDir = new File(path, "output");
        outputDir.mkdir();
        StreamingQuery q = inputData$3.toDF().writeStream().format($this.dataSourceName()).outputMode("append").option("checkpointLocation", checkpointDir.getAbsolutePath()).start(outputDir.getAbsolutePath());
        inputData$3.addData((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 30));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.metadataDf$1(outputDir), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)30)})), (List)Nil$.MODULE$)), $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 915));
        inputData$3.addData((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(31), 50));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.metadataDf$1(outputDir), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)30)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)20)})), (List)Nil$.MODULE$))), $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 920));
        inputData$3.addData((IterableOnce)package$.MODULE$.Seq().empty());
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.metadataDf$1(outputDir), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)30)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)20)})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)3), BoxesRunTime.boxToInteger((int)0)})), (List)Nil$.MODULE$)))), $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 926));
        inputData$3.addData((IterableOnce)RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(51), 100));
        $this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)$this.waitTimeout())), (JFunction0.mcV.sp & Serializable)() -> $this.checkAnswer((Function0<Dataset<Row>>)(Function0 & Serializable)() -> $this.spark().read().text(outputDir.getAbsolutePath() + "/3.txt"), (Seq<Row>)new .colon.colon((Object)Row$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{"failed in batch 3"})), (List)Nil$.MODULE$)), $this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 933));
        q.stop();
        String $org_scalatest_assert_macro_left = ((StreamingQueryException)q.exception().get()).message();
        String $org_scalatest_assert_macro_right = "invalid value";
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 940));
    }

    public static final /* synthetic */ void $anonfun$new$85(PythonStreamingDataSourceSuite $this, MemoryStream inputData$4, String mode$2, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        checkpointDir.mkdir();
        File outputDir = new File(path, "output");
        outputDir.mkdir();
        StreamingQuery q = inputData$4.toDF().writeStream().format($this.dataSourceName()).outputMode(mode$2).option("checkpointLocation", checkpointDir.getAbsolutePath()).start(outputDir.getAbsolutePath());
        q.stop();
        q.awaitTermination();
    }

    private final void runQuery$1(String mode) {
        MemoryStream inputData = MemoryStream$.MODULE$.apply(this.testImplicits().newIntEncoder(), this.sqlContext());
        this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
            PythonStreamingDataSourceSuite.$anonfun$new$85(this, inputData, mode, dir);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$84(PythonStreamingDataSourceSuite $this, File dir) {
        String path = dir.getAbsolutePath();
        File checkpointDir = new File(path, "checkpoint");
        checkpointDir.mkdir();
        File outputDir = new File(path, "output");
        outputDir.mkdir();
        $this.runQuery$1("append");
        $this.runQuery$1("update");
        AnalysisException x$1 = (AnalysisException)$this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> $this.runQuery$1("complete"), ClassTag$.MODULE$.apply(AnalysisException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 984));
        String x$2 = "_LEGACY_ERROR_TEMP_3102";
        Map x$3 = (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"msg"), (Object)"Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets")}));
        Option x$4 = $this.checkError$default$3();
        boolean x$5 = $this.checkError$default$5();
        SparkFunSuite.ExpectedContext[] x$6 = $this.checkError$default$6();
        $this.checkError((SparkThrowable)x$1, "_LEGACY_ERROR_TEMP_3102", x$4, x$3, x$5, x$6);
        IllegalArgumentException error2 = (IllegalArgumentException)$this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> $this.runQuery$1("invalid"), ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 993));
        String $org_scalatest_assert_macro_left = error2.getMessage();
        String $org_scalatest_assert_macro_right = "invalid";
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 996));
    }

    public PythonStreamingDataSourceSuite() {
        this.errorDataSourceName = "ErrorDataSource";
        this.test("Test PythonMicroBatchStream", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 153));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.testDataStreamReaderScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def streamReader(self, schema):\n         |        return TestDataStreamReader()\n         |"));
            StructType inputSchema = StructType$.MODULE$.fromDDL("input BINARY");
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            PythonDataSourceV2 pythonDs = new PythonDataSourceV2();
            pythonDs.setShortName("SimpleDataSource");
            PythonMicroBatchStream stream = new PythonMicroBatchStream(pythonDs, this.dataSourceName(), inputSchema, CaseInsensitiveStringMap.empty());
            ObjectRef startOffset = ObjectRef.create((Object)stream.initialOffset());
            String $org_scalatest_assert_macro_left = ((Offset)startOffset.elem).json();
            String $org_scalatest_assert_macro_right = "{\"offset\": {\"partition-1\": 0}}";
            String string = $org_scalatest_assert_macro_left;
            String string2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(string != null ? !string.equals(string2) : string2 != null), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 173));
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 50).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
                Offset endOffset = stream.latestOffset();
                String $org_scalatest_assert_macro_left = endOffset.json();
                String $org_scalatest_assert_macro_right = "{\"offset\": {\"partition-1\": " + 2 * i + "}}";
                String string = $org_scalatest_assert_macro_left;
                String string2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(string != null ? !string.equals(string2) : string2 != null), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 176));
                Object $org_scalatest_assert_macro_left2 = Predef$.MODULE$.refArrayOps((Object[])stream.planInputPartitions((Offset)startOffset$1.elem, endOffset));
                int $org_scalatest_assert_macro_right2 = 2;
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.lengthSizeMacroBool((Object)new ArrayOps($org_scalatest_assert_macro_left2), "size", (Object)BoxesRunTime.boxToInteger((int)ArrayOps$.MODULE$.size$extension($org_scalatest_assert_macro_left2)), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 177));
                stream.commit(endOffset);
                startOffset$1.elem = endOffset;
            });
            stream.stop();
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 152));
        this.test("SimpleDataSourceStreamReader run query and restart", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 185));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.simpleDataStreamReaderScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def simpleStreamReader(self, schema):\n         |        return SimpleDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 199));
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$4(this, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 184));
        this.test("SimpleDataSourceStreamReader read empty batch", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 240));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.simpleDataStreamReaderWithEmptyBatchScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def simpleStreamReader(self, schema):\n         |        return SimpleDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 254));
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$12(this, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 239));
        this.test("SimpleDataSourceStreamReader read exactly once", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 282));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.simpleDataStreamReaderScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def simpleStreamReader(self, schema):\n         |        return SimpleDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 296));
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$16(this, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 281));
        this.test("initialOffset() method not implemented in SimpleDataSourceStreamReader", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 339));
            String initialOffsetNotImplementedScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |from pyspark.sql.datasource import SimpleDataSourceStreamReader\n         |class ErrorDataStreamReader(SimpleDataSourceStreamReader):\n         |    ...\n         |\n         |class " + this.errorDataSourceName() + "(DataSource):\n         |    def simpleStreamReader(self, schema):\n         |        return ErrorDataStreamReader()\n         |"));
            StructType inputSchema = StructType$.MODULE$.fromDDL("input BINARY");
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.errorDataSourceName(), initialOffsetNotImplementedScript);
            this.spark().dataSource().registerPython(this.errorDataSourceName(), dataSource);
            PythonDataSourceV2 pythonDs = new PythonDataSourceV2();
            pythonDs.setShortName("ErrorDataSource");
            this.testMicroBatchStreamError$1("initialOffset", "[NOT_IMPLEMENTED] initialOffset is not implemented", (Function1 & Serializable)stream -> {
                stream.initialOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            this.testMicroBatchStreamError$1("latestOffset", "[NOT_IMPLEMENTED] initialOffset is not implemented", (Function1 & Serializable)stream -> {
                stream.latestOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 338));
        this.test("read() method throw error in SimpleDataSourceStreamReader", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 392));
            String initialOffsetNotImplementedScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |from pyspark.sql.datasource import SimpleDataSourceStreamReader\n         |class ErrorDataStreamReader(SimpleDataSourceStreamReader):\n         |    def initialOffset(self):\n         |        return {\"partition\": 1}\n         |    def read(self, start):\n         |        raise Exception(\"error reading available data\")\n         |\n         |class " + this.errorDataSourceName() + "(DataSource):\n         |    def simpleStreamReader(self, schema):\n         |        return ErrorDataStreamReader()\n         |"));
            StructType inputSchema = StructType$.MODULE$.fromDDL("input BINARY");
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.errorDataSourceName(), initialOffsetNotImplementedScript);
            this.spark().dataSource().registerPython(this.errorDataSourceName(), dataSource);
            PythonDataSourceV2 pythonDs = new PythonDataSourceV2();
            pythonDs.setShortName("ErrorDataSource");
            this.testMicroBatchStreamError$2("latestOffset", "Exception: error reading available data", (Function1 & Serializable)stream -> {
                stream.latestOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 391));
        this.test("Read from test data stream source", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 440));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.testDataStreamReaderScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def streamReader(self, schema):\n         |        return TestDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 455));
            Dataset df2 = this.spark().readStream().format(this.dataSourceName()).load();
            CountDownLatch stopSignal = new CountDownLatch(1);
            StreamingQuery q = df2.writeStream().foreachBatch((Function2 & Serializable)(df, batchId) -> {
                PythonStreamingDataSourceSuite.$anonfun$new$29(this, stopSignal, df, BoxesRunTime.unboxToLong((Object)batchId));
                return BoxedUnit.UNIT;
            }).trigger((Trigger)new ProcessingTimeTrigger(0L)).start();
            stopSignal.await();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.simpleMacroBool(ArrayOps$.MODULE$.forall$extension(Predef$.MODULE$.refArrayOps((Object[])q.recentProgress()), (Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)PythonStreamingDataSourceSuite.$anonfun$new$31(x$4))), "scala.Predef.refArrayOps[org.apache.spark.sql.streaming.StreamingQueryProgress](q.recentProgress).forall(((x$4: org.apache.spark.sql.streaming.StreamingQueryProgress) => x$4.numInputRows.==(2)))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 468));
            q.stop();
            q.awaitTermination();
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 439));
        this.test("Read from test data stream source, trigger interval=20 seconds", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(Function0 & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 475));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.testDataStreamReaderScript() + "\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def streamReader(self, schema):\n         |        return TestDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 490));
            Dataset df2 = this.spark().readStream().format(this.dataSourceName()).load();
            CountDownLatch stopSignal = new CountDownLatch(1);
            StreamingQuery q = df2.writeStream().foreachBatch((Function2 & Serializable)(df, batchId) -> {
                PythonStreamingDataSourceSuite.$anonfun$new$33(this, stopSignal, df, BoxesRunTime.unboxToLong((Object)batchId));
                return BoxedUnit.UNIT;
            }).trigger((Trigger)new ProcessingTimeTrigger(20000L)).start();
            stopSignal.await();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.simpleMacroBool(ArrayOps$.MODULE$.forall$extension(Predef$.MODULE$.refArrayOps((Object[])q.recentProgress()), (Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)PythonStreamingDataSourceSuite.$anonfun$new$35(x$5))), "scala.Predef.refArrayOps[org.apache.spark.sql.streaming.StreamingQueryProgress](q.recentProgress).forall(((x$5: org.apache.spark.sql.streaming.StreamingQueryProgress) => x$5.numInputRows.==(2)))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 503));
            q.stop();
            q.awaitTermination();
            Option $org_scalatest_assert_macro_left = q.exception();
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 506));
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 474));
        this.test("Streaming data source read with custom partitions", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 510));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition\n         |class RangePartition(InputPartition):\n         |    def __init__(self, start, end):\n         |        self.start = start\n         |        self.end = end\n         |\n         |class TestDataStreamReader(DataSourceStreamReader):\n         |    current = 0\n         |    def initialOffset(self):\n         |        return {\"offset\": 0}\n         |    def latestOffset(self):\n         |        self.current += 2\n         |        return {\"offset\": self.current}\n         |    def partitions(self, start: dict, end: dict):\n         |        return [RangePartition(start[\"offset\"], end[\"offset\"])]\n         |    def commit(self, end: dict):\n         |        1 + 2\n         |    def read(self, partition: RangePartition):\n         |        start, end = partition.start, partition.end\n         |        for i in range(start, end):\n         |            yield (i, )\n         |\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |\n         |    def streamReader(self, schema):\n         |        return TestDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 545));
            Dataset df2 = this.spark().readStream().format(this.dataSourceName()).load();
            CountDownLatch stopSignal = new CountDownLatch(1);
            StreamingQuery q = df2.writeStream().foreachBatch((Function2 & Serializable)(df, batchId) -> {
                PythonStreamingDataSourceSuite.$anonfun$new$37(this, stopSignal, df, BoxesRunTime.unboxToLong((Object)batchId));
                return BoxedUnit.UNIT;
            }).trigger((Trigger)new ProcessingTimeTrigger(0L)).start();
            stopSignal.await();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.simpleMacroBool(ArrayOps$.MODULE$.forall$extension(Predef$.MODULE$.refArrayOps((Object[])q.recentProgress()), (Function1 & Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)PythonStreamingDataSourceSuite.$anonfun$new$39(x$6))), "scala.Predef.refArrayOps[org.apache.spark.sql.streaming.StreamingQueryProgress](q.recentProgress).forall(((x$6: org.apache.spark.sql.streaming.StreamingQueryProgress) => x$6.numInputRows.==(2)))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 558));
            q.stop();
            q.awaitTermination();
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 509));
        this.test("Error creating stream reader", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(Function0 & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 564));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |    def streamReader(self, schema):\n         |        raise Exception(\"error creating stream reader\")\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            StreamingQueryException err = (StreamingQueryException)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> {
                StreamingQuery q = this.spark().readStream().format(this.dataSourceName()).load().writeStream().format("console").start();
                q.awaitTermination();
            }, ClassTag$.MODULE$.apply(StreamingQueryException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 578));
            String $org_scalatest_assert_macro_left = err.getErrorClass();
            String $org_scalatest_assert_macro_right = "STREAM_FAILED";
            String string = $org_scalatest_assert_macro_left;
            String string2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(string != null ? !string.equals(string2) : string2 != null), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 583));
            String $org_scalatest_assert_macro_left2 = err.getMessage();
            String $org_scalatest_assert_macro_right2 = "error creating stream reader";
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 584));
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 563));
        this.test("Streaming data source read error", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(Function0 & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 588));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition\n         |class RangePartition(InputPartition):\n         |    def __init__(self, start, end):\n         |        self.start = start\n         |        self.end = end\n         |\n         |class SimpleDataStreamReader(DataSourceStreamReader):\n         |    current = 0\n         |    def initialOffset(self):\n         |        return {\"offset\": \"0\"}\n         |    def latestOffset(self):\n         |        self.current += 2\n         |        return {\"offset\": str(self.current)}\n         |    def partitions(self, start: dict, end: dict):\n         |        return [RangePartition(int(start[\"offset\"]), int(end[\"offset\"]))]\n         |    def commit(self, end: dict):\n         |        1 + 2\n         |    def read(self, partition: RangePartition):\n         |        raise Exception(\"error reading data\")\n         |\n         |\n         |class " + this.dataSourceName() + "(DataSource):\n         |    def schema(self) -> str:\n         |        return \"id INT\"\n         |\n         |    def streamReader(self, schema):\n         |        return SimpleDataStreamReader()\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(this.spark().sessionState().dataSourceManager().dataSourceExists(this.dataSourceName()), "PythonStreamingDataSourceSuite.this.spark.sessionState.dataSourceManager.dataSourceExists(PythonStreamingDataSourceSuite.this.dataSourceName)", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 621));
            Dataset df = this.spark().readStream().format(this.dataSourceName()).load();
            StreamingQueryException err = (StreamingQueryException)this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> {
                StreamingQuery q = df.writeStream().foreachBatch((Function2 & Serializable)(df, x$7) -> {
                    df.count();
                    return BoxedUnit.UNIT;
                }).start();
                q.awaitTermination();
            }, ClassTag$.MODULE$.apply(StreamingQueryException.class), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 624));
            String $org_scalatest_assert_macro_left = err.getMessage();
            String $org_scalatest_assert_macro_right = "error reading data";
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 631));
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 587));
        this.test("Method not implemented in stream reader", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 635));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |from pyspark.sql.datasource import DataSourceStreamReader\n         |class ErrorDataStreamReader(DataSourceStreamReader):\n         |    def read(self, partition):\n         |        yield (0, partition.value)\n         |\n         |class " + this.errorDataSourceName() + "(DataSource):\n         |    def streamReader(self, schema):\n         |        return ErrorDataStreamReader()\n         |"));
            StructType inputSchema = StructType$.MODULE$.fromDDL("input BINARY");
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.errorDataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.errorDataSourceName(), dataSource);
            PythonDataSourceV2 pythonDs = new PythonDataSourceV2();
            pythonDs.setShortName("ErrorDataSource");
            this.testMicroBatchStreamError$3("initialOffset", "[NOT_IMPLEMENTED] initialOffset is not implemented", (Function1 & Serializable)stream -> {
                stream.initialOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            this.testMicroBatchStreamError$3("latestOffset", "[NOT_IMPLEMENTED] latestOffset is not implemented", (Function1 & Serializable)stream -> {
                stream.latestOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            PythonStreamingSourceOffset offset = new PythonStreamingSourceOffset("{\"offset\": \"2\"}");
            this.testMicroBatchStreamError$3("planPartitions", "[NOT_IMPLEMENTED] partitions is not implemented", (Function1 & Serializable)stream -> {
                stream.planInputPartitions((Offset)offset, (Offset)offset);
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 634));
        this.test("Error in stream reader", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 691));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |from pyspark.sql.datasource import DataSource\n         |" + this.errorDataStreamReaderScript() + "\n         |\n         |class " + this.errorDataSourceName() + "(DataSource):\n         |    def streamReader(self, schema):\n         |        return ErrorDataStreamReader()\n         |"));
            StructType inputSchema = StructType$.MODULE$.fromDDL("input BINARY");
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.errorDataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.errorDataSourceName(), dataSource);
            PythonDataSourceV2 pythonDs = new PythonDataSourceV2();
            pythonDs.setShortName("ErrorDataSource");
            PythonStreamingSourceOffset offset = new PythonStreamingSourceOffset("{\"offset\": 2}");
            this.testMicroBatchStreamError$4("initialOffset", "error reading initial offset", (Function1 & Serializable)stream -> {
                stream.initialOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            this.testMicroBatchStreamError$4("latestOffset", "error reading latest offset", (Function1 & Serializable)stream -> {
                stream.latestOffset();
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            this.testMicroBatchStreamError$4("planPartitions", "error planning partitions", (Function1 & Serializable)stream -> {
                stream.planInputPartitions((Offset)offset, (Offset)offset);
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
            this.testMicroBatchStreamError$4("commitSource", "error committing offset", (Function1 & Serializable)stream -> {
                stream.commit((Offset)offset);
                return BoxedUnit.UNIT;
            }, pythonDs, inputSchema);
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 690));
        new .colon.colon((Object)"append", (List)new .colon.colon((Object)"complete", (List)Nil$.MODULE$)).foreach((Function1 & Serializable)mode -> {
            this.test("data source stream write - " + mode + " mode", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 746));
                UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource($this.dataSourceName(), $this.simpleDataStreamWriterScript());
                $this.spark().dataSource().registerPython($this.dataSourceName(), dataSource);
                MemoryStream inputData = MemoryStream$.MODULE$.apply($this.testImplicits().newIntEncoder(), $this.sqlContext());
                $this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                    PythonStreamingDataSourceSuite.$anonfun$new$58($this, mode, inputData, dir);
                    return BoxedUnit.UNIT;
                });
            }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 745));
            return BoxedUnit.UNIT;
        });
        this.test("data source stream write, trigger interval=20 seconds", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 808));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), this.simpleDataStreamWriterScript());
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            MemoryStream inputData = MemoryStream$.MODULE$.apply(3, this.testImplicits().newIntEncoder(), this.sqlContext());
            Dataset df = inputData.toDF();
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$66(this, df, inputData, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 807));
        this.test("streaming sink write commit and abort", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 847));
            String dataSourceScript = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n         |import json\n         |import os\n         |from dataclasses import dataclass\n         |from pyspark import TaskContext\n         |from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage\n         |\n         |@dataclass\n         |class SimpleCommitMessage(WriterCommitMessage):\n         |    partition_id: int\n         |    count: int\n         |\n         |class SimpleDataSourceStreamWriter(DataSourceStreamWriter):\n         |    def __init__(self, options):\n         |        self.options = options\n         |        self.path = self.options.get(\"path\")\n         |        assert self.path is not None\n         |\n         |    def write(self, iterator):\n         |        context = TaskContext.get()\n         |        partition_id = context.partitionId()\n         |        cnt = 0\n         |        for row in iterator:\n         |            if row.value > 50:\n         |                raise Exception(\"invalid value\")\n         |            cnt += 1\n         |        return SimpleCommitMessage(partition_id=partition_id, count=cnt)\n         |\n         |    def commit(self, messages, batchId) -> None:\n         |        status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))\n         |\n         |        with open(os.path.join(self.path, f\"{batchId}.json\"), \"a\") as file:\n         |            file.write(json.dumps(status) + \"\\n\")\n         |\n         |    def abort(self, messages, batchId) -> None:\n         |        with open(os.path.join(self.path, f\"{batchId}.txt\"), \"w\") as file:\n         |            file.write(f\"failed in batch {batchId}\")\n         |\n         |class SimpleDataSource(DataSource):\n         |    def streamWriter(self, schema, overwrite):\n         |        return SimpleDataSourceStreamWriter(self.options)\n         |"));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), dataSourceScript);
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            MemoryStream inputData = MemoryStream$.MODULE$.apply(3, this.testImplicits().newIntEncoder(), this.sqlContext());
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$74(this, inputData, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 846));
        this.test("python streaming sink: invalid write mode", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(IntegratedUDFTestUtils$.MODULE$.shouldTestPandasUDFs(), "org.apache.spark.sql.IntegratedUDFTestUtils.shouldTestPandasUDFs", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssume($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 945));
            UserDefinedPythonDataSource dataSource = IntegratedUDFTestUtils$.MODULE$.createUserDefinedPythonDataSource(this.dataSourceName(), this.simpleDataStreamWriterScript());
            this.spark().dataSource().registerPython(this.dataSourceName(), dataSource);
            this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)dir -> {
                PythonStreamingDataSourceSuite.$anonfun$new$84(this, dir);
                return BoxedUnit.UNIT;
            });
        }, new Position("PythonStreamingDataSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 944));
    }
}

