/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.streaming;

import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.QueryTest$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.streaming.MemorySink;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation;
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamTest;
import org.apache.spark.sql.streaming.StreamTest$;
import org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1$;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.util.Clock;
import org.apache.spark.util.SystemClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.TripleEquals;
import org.scalatest.Assertions;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.exceptions.TestFailedDueToTimeoutException;
import org.scalatest.time.Span$;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ManifestFactory$;
import scala.reflect.api.TypeTags;
import scala.reflect.runtime.package$;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.VolatileObjectRef;
import scala.util.Random$;
import scala.util.control.NonFatal$;

public abstract class StreamTest$class {
    public static void testStream(StreamTest $this, Dataset _stream, OutputMode outputMode, Seq actions) {
        StreamTest streamTest = $this;
        synchronized (streamTest) {
            Seq seq;
            Dataset stream = _stream.toDF();
            SparkSession sparkSession = stream.sparkSession();
            IntRef pos = IntRef.create((int)0);
            ObjectRef currentStream = ObjectRef.create(null);
            ObjectRef lastStream = ObjectRef.create(null);
            HashMap awaiting = new HashMap();
            MemorySink sink = new MemorySink(stream.schema(), outputMode);
            Map resetConfValues = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
            VolatileObjectRef streamThreadDeathCause = VolatileObjectRef.create(null);
            StreamingQueryListener listener = new StreamingQueryListener($this, streamThreadDeathCause){
                public final VolatileObjectRef streamThreadDeathCause$1;

                public void onQueryStarted(StreamingQueryListener.QueryStartedEvent event) {
                    Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(this){
                        private final /* synthetic */ StreamTest$.anon.1 $outer;

                        public void uncaughtException(Thread t, Throwable e) {
                            this.$outer.streamThreadDeathCause$1.elem = e;
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                }

                public void onQueryProgress(StreamingQueryListener.QueryProgressEvent event) {
                }

                public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent event) {
                }
                {
                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                }
            };
            sparkSession.streams().addListener(listener);
            boolean startedManually = ((IterableLike)actions.takeWhile((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final boolean apply(StreamTest.StreamAction x$1) {
                    return !(x$1 instanceof StreamTest.StreamMustBeRunning);
                }
            })).exists((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final boolean apply(StreamTest.StreamAction x$2) {
                    return x$2 instanceof StreamTest.StartStream;
                }
            });
            if (startedManually) {
                seq = actions;
            } else {
                StreamTest.StartStream startStream = new StreamTest.StartStream($this, $this.StartStream().apply$default$1(), $this.StartStream().apply$default$2(), $this.StartStream().apply$default$3());
                seq = (Seq)actions.$plus$colon((Object)startStream, Seq$.MODULE$.canBuildFrom());
            }
            Seq startedTest = seq;
            String x$14 = "streaming.metadata";
            String x$15 = Utils$.MODULE$.createTempDir$default$1();
            String metadataRoot = Utils$.MODULE$.createTempDir(x$15, x$14).getCanonicalPath();
            LongRef manualClockExpectedTime = LongRef.create((long)-1L);
            StreamTest$class.liftedTree1$1($this, stream, sparkSession, pos, currentStream, lastStream, awaiting, sink, resetConfValues, streamThreadDeathCause, listener, startedManually, startedTest, metadataRoot, manualClockExpectedTime, outputMode, actions);
            return;
        }
    }

    public static OutputMode testStream$default$2(StreamTest $this) {
        return OutputMode.Append();
    }

    public static void runStressTest(StreamTest $this, Dataset ds, Function1 addData2, int iterations) {
        $this.runStressTest((Dataset<Object>)ds, (Seq<StreamTest.StreamAction>)((Seq)Seq$.MODULE$.empty()), (Function2<Seq<Object>, Object, StreamTest.StreamAction>)new Serializable($this, addData2){
            public static final long serialVersionUID = 0L;
            private final Function1 addData$1;

            public final StreamTest.StreamAction apply(Seq<Object> data, boolean running) {
                return (StreamTest.StreamAction)this.addData$1.apply(data);
            }
            {
                this.addData$1 = addData$1;
            }
        }, iterations);
    }

    public static void runStressTest(StreamTest $this, Dataset ds, Seq prepareActions, Function2 addData2, int iterations) {
        ExpressionEncoder intEncoder = ExpressionEncoder$.MODULE$.apply(((TypeTags)package$.MODULE$.universe()).TypeTag().Int());
        IntRef dataPos = IntRef.create((int)0);
        BooleanRef running = BooleanRef.create((boolean)true);
        ArrayBuffer actions = new ArrayBuffer();
        actions.$plus$plus$eq((TraversableOnce)prepareActions);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), iterations).foreach((Function1)new Serializable($this, intEncoder, dataPos, running, actions, addData2){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamTest $outer;
            private final ExpressionEncoder intEncoder$1;
            private final IntRef dataPos$1;
            private final BooleanRef running$1;
            private final ArrayBuffer actions$1;
            private final Function2 addData$2;

            public final Object apply(int i) {
                ArrayBuffer arrayBuffer;
                double rand = Random$.MODULE$.nextDouble();
                if (this.running$1.elem) {
                    ArrayBuffer arrayBuffer2;
                    double d = rand;
                    if (d < 0.1) {
                        arrayBuffer2 = StreamTest$class.addCheck$1(this.$outer, this.intEncoder$1, this.dataPos$1, this.actions$1);
                    } else if (d < 0.7) {
                        arrayBuffer2 = StreamTest$class.addRandomData$1(this.$outer, this.dataPos$1, this.running$1, this.actions$1, this.addData$2);
                    } else {
                        StreamTest$class.addCheck$1(this.$outer, this.intEncoder$1, this.dataPos$1, this.actions$1);
                        this.actions$1.$plus$eq((Object)this.$outer.StopStream());
                        this.running$1.elem = false;
                        arrayBuffer2 = BoxedUnit.UNIT;
                    }
                    arrayBuffer = arrayBuffer2;
                } else {
                    ArrayBuffer arrayBuffer3;
                    double d = rand;
                    if (d < 0.7) {
                        arrayBuffer3 = StreamTest$class.addRandomData$1(this.$outer, this.dataPos$1, this.running$1, this.actions$1, this.addData$2);
                    } else {
                        this.actions$1.$plus$eq((Object)new StreamTest.StartStream(this.$outer, this.$outer.StartStream().apply$default$1(), this.$outer.StartStream().apply$default$2(), this.$outer.StartStream().apply$default$3()));
                        this.running$1.elem = true;
                        arrayBuffer3 = BoxedUnit.UNIT;
                    }
                    arrayBuffer = arrayBuffer3;
                }
                return arrayBuffer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.intEncoder$1 = intEncoder$1;
                this.dataPos$1 = dataPos$1;
                this.running$1 = running$1;
                this.actions$1 = actions$1;
                this.addData$2 = addData$2;
            }
        });
        Object object = running.elem ? BoxedUnit.UNIT : actions.$plus$eq((Object)new StreamTest.StartStream($this, $this.StartStream().apply$default$1(), $this.StartStream().apply$default$2(), $this.StartStream().apply$default$3()));
        StreamTest$class.addCheck$1($this, intEncoder, dataPos, actions);
        $this.testStream(ds, $this.testStream$default$2(), (Seq<StreamTest.StreamAction>)actions);
    }

    public static int runStressTest$default$3(StreamTest $this) {
        return 100;
    }

    private static final String testActions$1(StreamTest $this, IntRef pos$1, boolean startedManually$1, Seq actions$2) {
        return ((TraversableOnce)((TraversableLike)actions$2.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map((Function1)new Serializable($this, pos$1, startedManually$1){
            public static final long serialVersionUID = 0L;
            private final IntRef pos$1;
            private final boolean startedManually$1;

            public final String apply(Tuple2<StreamTest.StreamAction, Object> x0$1) {
                Tuple2<StreamTest.StreamAction, Object> tuple2 = x0$1;
                if (tuple2 != null) {
                    StreamTest.StreamAction a = (StreamTest.StreamAction)tuple2._1();
                    int i = tuple2._2$mcI$sp();
                    String string = this.pos$1.elem == i && this.startedManually$1 || this.pos$1.elem == i + 1 && !this.startedManually$1 ? new StringBuilder().append((Object)"=> ").append((Object)a.toString()).toString() : new StringBuilder().append((Object)"   ").append((Object)a.toString()).toString();
                    return string;
                }
                throw new MatchError(tuple2);
            }
            {
                this.pos$1 = pos$1;
                this.startedManually$1 = startedManually$1;
            }
        }, Seq$.MODULE$.canBuildFrom())).mkString("\n");
    }

    private static final String currentOffsets$1(StreamTest $this, ObjectRef currentStream$1) {
        return (StreamExecution)currentStream$1.elem == null ? "not started" : ((StreamExecution)currentStream$1.elem).committedOffsets().toString();
    }

    private static final String threadState$1(StreamTest $this, ObjectRef currentStream$1) {
        return (StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).microBatchThread().isAlive() ? "alive" : "dead";
    }

    private static final String testState$1(StreamTest $this, IntRef pos$1, ObjectRef currentStream$1, MemorySink sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        return new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |== Progress ==\n         |", "\n         |\n         |== Stream ==\n         |Output Mode: ", "\n         |Stream state: ", "\n         |Thread state: ", "\n         |", "\n         |\n         |== Sink ==\n         |", "\n         |\n         |\n         |== Plan ==\n         |", "\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{StreamTest$class.testActions$1($this, pos$1, startedManually$1, actions$2), outputMode$1, StreamTest$class.currentOffsets$1($this, currentStream$1), StreamTest$class.threadState$1($this, currentStream$1), (Throwable)streamThreadDeathCause$1.elem == null ? "" : org.apache.spark.sql.catalyst.util.package$.MODULE$.stackTraceToString((Throwable)streamThreadDeathCause$1.elem), sink$1.toDebugString(), (StreamExecution)currentStream$1.elem == null ? "" : ((StreamExecution)currentStream$1.elem).lastExecution()})))).stripMargin();
    }

    public static final void verify$1(StreamTest $this, Function0 condition, String message, IntRef pos$1, ObjectRef currentStream$1, MemorySink sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        if (condition.apply$mcZ$sp()) {
            return;
        }
        throw StreamTest$class.failTest$1($this, message, StreamTest$class.failTest$default$2$1($this), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
    }

    public static final Object eventually$1(StreamTest $this, String message, Function0 func, IntRef pos$1, ObjectRef currentStream$1, MemorySink sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        try {
            return Eventually$.MODULE$.eventually(new PatienceConfiguration.Timeout($this.streamingTimeout()), func, Eventually$.MODULE$.patienceConfig());
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            throw StreamTest$class.failTest$1($this, message, e, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
        }
    }

    public static final String exceptionToString$1(StreamTest $this, Throwable e, String prefix) {
        String base = new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix, e.getMessage()}))).append((Object)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])e.getStackTrace()).take(10)).mkString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", "\\t"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})), "\n")).toString();
        return e.getCause() == null ? base : new StringBuilder().append((Object)base).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", "\\tCaused by: "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix}))).append((Object)StreamTest$class.exceptionToString$1($this, e.getCause(), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "\\t"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})))).toString();
    }

    public static final String exceptionToString$default$2$1(StreamTest $this) {
        return "";
    }

    public static final Nothing$ failTest$1(StreamTest $this, String message, Throwable cause, IntRef pos$1, ObjectRef currentStream$1, MemorySink sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        Option c = Option$.MODULE$.apply((Object)cause).map((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamTest $outer;

            public final String apply(Throwable x$4) {
                return StreamTest$class.exceptionToString$1(this.$outer, x$4, StreamTest$class.exceptionToString$default$2$1(this.$outer));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        None$ m = message != null && new StringOps(Predef$.MODULE$.augmentString(message)).size() > 0 ? new Some((Object)message) : None$.MODULE$;
        return ((Assertions)$this).fail(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           |", "\n           |", "\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((TraversableOnce)Option$.MODULE$.option2Iterable((Option)m).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(c), Iterable$.MODULE$.canBuildFrom())).mkString(": "), StreamTest$class.testState$1($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2)})))).stripMargin());
    }

    public static final Throwable failTest$default$2$1(StreamTest $this) {
        return null;
    }

    /*
     * Loose catch block
     */
    private static final void liftedTree1$1(StreamTest $this, Dataset stream$1, SparkSession sparkSession$1, IntRef pos$1, ObjectRef currentStream$1, ObjectRef lastStream$1, HashMap awaiting$1, MemorySink sink$1, Map resetConfValues$1, VolatileObjectRef streamThreadDeathCause$1, StreamingQueryListener listener$1, boolean startedManually$1, Seq startedTest$1, String metadataRoot$1, LongRef manualClockExpectedTime$1, OutputMode outputMode$1, Seq actions$2) {
        block9: {
            startedTest$1.foreach((Function1)new Serializable($this, stream$1, sparkSession$1, pos$1, currentStream$1, lastStream$1, awaiting$1, sink$1, resetConfValues$1, streamThreadDeathCause$1, startedManually$1, metadataRoot$1, manualClockExpectedTime$1, outputMode$1, actions$2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ StreamTest $outer;
                public final Dataset stream$1;
                public final SparkSession sparkSession$1;
                public final IntRef pos$1;
                public final ObjectRef currentStream$1;
                public final ObjectRef lastStream$1;
                private final HashMap awaiting$1;
                public final MemorySink sink$1;
                public final Map resetConfValues$1;
                public final VolatileObjectRef streamThreadDeathCause$1;
                public final boolean startedManually$1;
                private final String metadataRoot$1;
                public final LongRef manualClockExpectedTime$1;
                public final OutputMode outputMode$1;
                public final Seq actions$2;

                /*
                 * Loose catch block
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public final void apply(StreamTest.StreamAction action) {
                    block31: {
                        Object object;
                        BoxedUnit boxedUnit;
                        block29: {
                            Object object2;
                            block28: {
                                BoxedUnit boxedUnit2;
                                block36: {
                                    boolean isSorted2;
                                    boolean lastOnly;
                                    Seq<Row> expectedAnswer;
                                    block35: {
                                        StreamTest.StreamAction streamAction;
                                        block34: {
                                            block33: {
                                                block32: {
                                                    block30: {
                                                        ((Logging)this.$outer).logInfo((Function0)new Serializable(this, action){
                                                            public static final long serialVersionUID = 0L;
                                                            private final StreamTest.StreamAction action$1;

                                                            public final String apply() {
                                                                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Processing test stream action: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.action$1}));
                                                            }
                                                            {
                                                                this.action$1 = action$1;
                                                            }
                                                        });
                                                        streamAction = action;
                                                        if (streamAction instanceof StreamTest.StartStream) {
                                                            StreamTest.StartStream startStream = (StreamTest.StartStream)streamAction;
                                                            Trigger trigger = startStream.trigger();
                                                            Clock triggerClock = startStream.triggerClock();
                                                            scala.collection.immutable.Map<String, String> additionalConfs = startStream.additionalConfs();
                                                            StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                                public static final long serialVersionUID = 0L;
                                                                private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                                public final boolean apply() {
                                                                    return this.apply$mcZ$sp();
                                                                }

                                                                public boolean apply$mcZ$sp() {
                                                                    return (StreamExecution)this.$outer.currentStream$1.elem == null;
                                                                }
                                                                {
                                                                    if ($outer == null) {
                                                                        throw null;
                                                                    }
                                                                    this.$outer = $outer;
                                                                }
                                                            }, "stream already running", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                            StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, triggerClock){
                                                                public static final long serialVersionUID = 0L;
                                                                private final Clock triggerClock$1;

                                                                public final boolean apply() {
                                                                    return this.apply$mcZ$sp();
                                                                }

                                                                public boolean apply$mcZ$sp() {
                                                                    return this.triggerClock$1 instanceof SystemClock || this.triggerClock$1 instanceof StreamTest.StreamManualClock;
                                                                }
                                                                {
                                                                    this.triggerClock$1 = triggerClock$1;
                                                                }
                                                            }, "Use either SystemClock or StreamManualClock to start the stream", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                            if (triggerClock instanceof StreamTest.StreamManualClock) {
                                                                this.manualClockExpectedTime$1.elem = ((StreamTest.StreamManualClock)triggerClock).getTimeMillis();
                                                            }
                                                            additionalConfs.foreach((Function1)new Serializable(this){
                                                                public static final long serialVersionUID = 0L;
                                                                private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                                public final void apply(Tuple2<String, String> pair) {
                                                                    None$ value = this.$outer.sparkSession$1.conf().contains((String)pair._1()) ? new Some((Object)this.$outer.sparkSession$1.conf().get((String)pair._1())) : None$.MODULE$;
                                                                    this.$outer.resetConfValues$1.update(pair._1(), (Object)value);
                                                                    this.$outer.sparkSession$1.conf().set((String)pair._1(), (String)pair._2());
                                                                }
                                                                {
                                                                    if ($outer == null) {
                                                                        throw null;
                                                                    }
                                                                    this.$outer = $outer;
                                                                }
                                                            });
                                                            this.lastStream$1.elem = (StreamExecution)this.currentStream$1.elem;
                                                            StreamingQueryManager qual$3 = this.sparkSession$1.streams();
                                                            None$ x$16 = None$.MODULE$;
                                                            Some x$17 = new Some((Object)this.metadataRoot$1);
                                                            Dataset x$18 = this.stream$1;
                                                            MemorySink x$19 = this.sink$1;
                                                            OutputMode x$20 = this.outputMode$1;
                                                            Trigger x$21 = trigger;
                                                            Clock x$22 = triggerClock;
                                                            boolean x$23 = qual$3.startQuery$default$6();
                                                            boolean x$24 = qual$3.startQuery$default$7();
                                                            this.currentStream$1.elem = ((StreamingQueryWrapper)qual$3.startQuery((Option)x$16, (Option)x$17, x$18, (Sink)x$19, x$20, x$23, x$24, x$21, x$22)).streamingQuery();
                                                            ((StreamExecution)this.currentStream$1.elem).awaitInitialization(Span$.MODULE$.convertSpanToDuration(this.$outer.streamingTimeout()).toMillis());
                                                            boxedUnit2 = BoxedUnit.UNIT;
                                                        }
                                                        if (!(streamAction instanceof StreamTest.AdvanceManualClock)) break block30;
                                                        StreamTest.AdvanceManualClock advanceManualClock = (StreamTest.AdvanceManualClock)streamAction;
                                                        long timeToAdd = advanceManualClock.timeToAdd();
                                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                            public final boolean apply() {
                                                                return this.apply$mcZ$sp();
                                                            }

                                                            public boolean apply$mcZ$sp() {
                                                                return (StreamExecution)this.$outer.currentStream$1.elem != null;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                            }
                                                        }, "can not advance manual clock when a stream is not running", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                            public final boolean apply() {
                                                                return this.apply$mcZ$sp();
                                                            }

                                                            public boolean apply$mcZ$sp() {
                                                                return ((StreamExecution)this.$outer.currentStream$1.elem).triggerClock() instanceof StreamTest.StreamManualClock;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                            }
                                                        }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"can not advance clock of type ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((StreamExecution)this.currentStream$1.elem).triggerClock().getClass()})), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        StreamTest.StreamManualClock clock = (StreamTest.StreamManualClock)((StreamExecution)this.currentStream$1.elem).triggerClock();
                                                        long $org_scalatest_assert_macro_left = this.manualClockExpectedTime$1.elem;
                                                        int $org_scalatest_assert_macro_right = 0;
                                                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right);
                                                        ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                                                        StreamTest$class.eventually$1(this.$outer, "StreamManualClock has not yet entered the waiting state", (Function0)new Serializable(this, clock){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                                            private final StreamTest.StreamManualClock clock$1;

                                                            public final void apply() {
                                                                this.apply$mcV$sp();
                                                            }

                                                            public void apply$mcV$sp() {
                                                                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(this.clock$1.isStreamWaitingAt(this.$outer.manualClockExpectedTime$1.elem), "clock.isStreamWaitingAt(manualClockExpectedTime)");
                                                                ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                                this.clock$1 = clock$1;
                                                            }
                                                        }, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        clock.advance(timeToAdd);
                                                        this.manualClockExpectedTime$1.elem += timeToAdd;
                                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, clock){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                                            private final StreamTest.StreamManualClock clock$1;

                                                            public final boolean apply() {
                                                                return this.apply$mcZ$sp();
                                                            }

                                                            public boolean apply$mcZ$sp() {
                                                                return ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer((Object)BoxesRunTime.boxToLong((long)this.clock$1.getTimeMillis())).$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)this.$outer.manualClockExpectedTime$1.elem), Equality$.MODULE$.default());
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                                this.clock$1 = clock$1;
                                                            }
                                                        }, new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unexpected clock time after updating: "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"expecting ", ", current ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.manualClockExpectedTime$1.elem), BoxesRunTime.boxToLong((long)clock.getTimeMillis())}))).toString(), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        boxedUnit = BoxedUnit.UNIT;
                                                        break block31;
                                                    }
                                                    if (this.$outer.StopStream().equals(streamAction)) {
                                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                            public final boolean apply() {
                                                                return this.apply$mcZ$sp();
                                                            }

                                                            public boolean apply$mcZ$sp() {
                                                                return (StreamExecution)this.$outer.currentStream$1.elem != null;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                            }
                                                        }, "can not stop a stream that is not running", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        object2 = this.$outer.failAfter(this.$outer.streamingTimeout(), (Function0)new Serializable(this){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                            public final void apply() {
                                                                this.apply$mcV$sp();
                                                            }

                                                            public void apply$mcV$sp() {
                                                                ((StreamExecution)this.$outer.currentStream$1.elem).stop();
                                                                StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.10 $outer;

                                                                    public final boolean apply() {
                                                                        return this.apply$mcZ$sp();
                                                                    }

                                                                    public boolean apply$mcZ$sp() {
                                                                        return !((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).microBatchThread().isAlive();
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                    }
                                                                }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"microbatch thread not stopped"})).s((Seq)Nil$.MODULE$), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.10 $outer;

                                                                    public final boolean apply() {
                                                                        return this.apply$mcZ$sp();
                                                                    }

                                                                    public boolean apply$mcZ$sp() {
                                                                        return !((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).isActive();
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                    }
                                                                }, "query.isActive() is false even after stopping", this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.10 $outer;

                                                                    public final boolean apply() {
                                                                        return this.apply$mcZ$sp();
                                                                    }

                                                                    public boolean apply$mcZ$sp() {
                                                                        return ((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).exception().isEmpty();
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                    }
                                                                }, new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"query.exception() is not empty after clean stop: "})).s((Seq)Nil$.MODULE$)).append(((StreamExecution)this.$outer.currentStream$1.elem).exception().map((Function1)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;

                                                                    public final String apply(StreamingQueryException x$5) {
                                                                        return x$5.toString();
                                                                    }
                                                                }).getOrElse((Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;

                                                                    public final String apply() {
                                                                        return "";
                                                                    }
                                                                })).toString(), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                            }

                                                            public /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer() {
                                                                return this.$outer;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                            }
                                                        }, this.$outer.defaultInterruptor());
                                                        break block28;
                                                    }
                                                    if (streamAction instanceof StreamTest.ExpectFailure) {
                                                        StreamTest.ExpectFailure expectFailure = (StreamTest.ExpectFailure)streamAction;
                                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                            public final boolean apply() {
                                                                return this.apply$mcZ$sp();
                                                            }

                                                            public boolean apply$mcZ$sp() {
                                                                return (StreamExecution)this.$outer.currentStream$1.elem != null;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                            }
                                                        }, "can not expect failure when stream is not running", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                        object = this.$outer.failAfter(this.$outer.streamingTimeout(), (Function0)new Serializable(this, expectFailure){
                                                            public static final long serialVersionUID = 0L;
                                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                                            public final StreamTest.ExpectFailure x6$1;

                                                            public final void apply() {
                                                                this.apply$mcV$sp();
                                                            }

                                                            public void apply$mcV$sp() {
                                                                StreamingQueryException thrownException = (StreamingQueryException)((Object)((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).intercept((Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.12 $outer;

                                                                    public final void apply() {
                                                                        this.apply$mcV$sp();
                                                                    }

                                                                    public void apply$mcV$sp() {
                                                                        ((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).awaitTermination();
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                    }
                                                                }, ManifestFactory$.MODULE$.classType(StreamingQueryException.class)));
                                                                StreamTest$class.eventually$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), "microbatch thread not stopped after termination with failure", (Function0)new Serializable(this){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.12 $outer;

                                                                    public final void apply() {
                                                                        this.apply$mcV$sp();
                                                                    }

                                                                    public void apply$mcV$sp() {
                                                                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).microBatchThread().isAlive(), "currentStream.microBatchThread.isAlive()"));
                                                                        ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                    }
                                                                }, this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this, thrownException){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.12 $outer;
                                                                    private final StreamingQueryException thrownException$1;

                                                                    public final boolean apply() {
                                                                        return this.apply$mcZ$sp();
                                                                    }

                                                                    public boolean apply$mcZ$sp() {
                                                                        return ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer((Object)((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).exception()).$eq$eq$eq((Object)new Some((Object)((Object)this.thrownException$1)), Equality$.MODULE$.default());
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                        this.thrownException$1 = thrownException$1;
                                                                    }
                                                                }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"incorrect exception returned by query.exception()"})).s((Seq)Nil$.MODULE$), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                StreamingQueryException exception = (StreamingQueryException)((Object)((StreamExecution)this.$outer.currentStream$1.elem).exception().get());
                                                                StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this, exception){
                                                                    public static final long serialVersionUID = 0L;
                                                                    private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.12 $outer;
                                                                    private final StreamingQueryException exception$1;

                                                                    public final boolean apply() {
                                                                        return this.apply$mcZ$sp();
                                                                    }

                                                                    public boolean apply$mcZ$sp() {
                                                                        return ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer(this.exception$1.cause().getClass()).$eq$eq$eq(this.$outer.x6$1.causeClass(), Equality$.MODULE$.default());
                                                                    }
                                                                    {
                                                                        if ($outer == null) {
                                                                            throw null;
                                                                        }
                                                                        this.$outer = $outer;
                                                                        this.exception$1 = exception$1;
                                                                    }
                                                                }, new StringBuilder().append((Object)"incorrect cause in exception returned by query.exception()\n").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\tExpected: ", "\\n\\tReturned: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.x6$1.causeClass(), exception.cause().getClass()}))).toString(), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                if (this.x6$1.isFatalError()) {
                                                                    StreamTest$class.verify$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), (Function0)new Serializable(this){
                                                                        public static final long serialVersionUID = 0L;
                                                                        private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.12 $outer;

                                                                        public final boolean apply() {
                                                                            return this.apply$mcZ$sp();
                                                                        }

                                                                        public boolean apply$mcZ$sp() {
                                                                            return (Throwable)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().streamThreadDeathCause$1.elem != null && ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer(((Throwable)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().streamThreadDeathCause$1.elem).getClass()).$eq$eq$eq(this.$outer.x6$1.causeClass(), Equality$.MODULE$.default());
                                                                        }
                                                                        {
                                                                            if ($outer == null) {
                                                                                throw null;
                                                                            }
                                                                            this.$outer = $outer;
                                                                        }
                                                                    }, new StringBuilder().append((Object)"UncaughtExceptionHandler didn't receive the correct error\n").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\tExpected: ", "\\n\\tReturned: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.x6$1.causeClass(), (Throwable)this.$outer.streamThreadDeathCause$1.elem}))).toString(), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                                                    this.$outer.streamThreadDeathCause$1.elem = null;
                                                                }
                                                            }

                                                            public /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer() {
                                                                return this.$outer;
                                                            }
                                                            {
                                                                if ($outer == null) {
                                                                    throw null;
                                                                }
                                                                this.$outer = $outer;
                                                                this.x6$1 = x6$1;
                                                            }
                                                        }, this.$outer.defaultInterruptor());
                                                        break block29;
                                                    }
                                                    if (!(streamAction instanceof StreamTest.AssertOnQuery)) break block32;
                                                    StreamTest.AssertOnQuery assertOnQuery = (StreamTest.AssertOnQuery)streamAction;
                                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                                        public static final long serialVersionUID = 0L;
                                                        private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                        public final boolean apply() {
                                                            return this.apply$mcZ$sp();
                                                        }

                                                        public boolean apply$mcZ$sp() {
                                                            return (StreamExecution)this.$outer.currentStream$1.elem != null || (StreamExecution)this.$outer.lastStream$1.elem != null;
                                                        }
                                                        {
                                                            if ($outer == null) {
                                                                throw null;
                                                            }
                                                            this.$outer = $outer;
                                                        }
                                                    }, "cannot assert when not stream has been started", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                    StreamExecution streamToAssert = (StreamExecution)Option$.MODULE$.apply((Object)((StreamExecution)this.currentStream$1.elem)).getOrElse((Function0)new Serializable(this){
                                                        public static final long serialVersionUID = 0L;
                                                        private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                        public final StreamExecution apply() {
                                                            return (StreamExecution)this.$outer.lastStream$1.elem;
                                                        }
                                                        {
                                                            if ($outer == null) {
                                                                throw null;
                                                            }
                                                            this.$outer = $outer;
                                                        }
                                                    });
                                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, streamToAssert, assertOnQuery){
                                                        public static final long serialVersionUID = 0L;
                                                        private final StreamExecution streamToAssert$1;
                                                        private final StreamTest.AssertOnQuery x7$1;

                                                        public final boolean apply() {
                                                            return this.apply$mcZ$sp();
                                                        }

                                                        public boolean apply$mcZ$sp() {
                                                            return BoxesRunTime.unboxToBoolean((Object)this.x7$1.condition().apply((Object)this.streamToAssert$1));
                                                        }
                                                        {
                                                            this.streamToAssert$1 = streamToAssert$1;
                                                            this.x7$1 = x7$1;
                                                        }
                                                    }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assert on query failed: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{assertOnQuery.message()})), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                    boxedUnit = BoxedUnit.UNIT;
                                                    break block31;
                                                }
                                                if (!(streamAction instanceof StreamTest.Assert)) break block33;
                                                StreamTest.Assert assert_ = (StreamTest.Assert)streamAction;
                                                StreamExecution streamToAssert = (StreamExecution)Option$.MODULE$.apply((Object)((StreamExecution)this.currentStream$1.elem)).getOrElse((Function0)new Serializable(this){
                                                    public static final long serialVersionUID = 0L;
                                                    private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                    public final StreamExecution apply() {
                                                        return (StreamExecution)this.$outer.lastStream$1.elem;
                                                    }
                                                    {
                                                        if ($outer == null) {
                                                            throw null;
                                                        }
                                                        this.$outer = $outer;
                                                    }
                                                });
                                                StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, assert_){
                                                    public static final long serialVersionUID = 0L;
                                                    private final StreamTest.Assert x8$1;

                                                    public final boolean apply() {
                                                        return this.apply$mcZ$sp();
                                                    }

                                                    public boolean apply$mcZ$sp() {
                                                        this.x8$1.run();
                                                        return true;
                                                    }
                                                    {
                                                        this.x8$1 = x8$1;
                                                    }
                                                }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assert failed: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{assert_.message()})), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                boxedUnit = BoxedUnit.UNIT;
                                                break block31;
                                            }
                                            if (streamAction instanceof StreamTest.AddData) {
                                                Tuple2 tuple2;
                                                StreamTest.AddData addData2 = (StreamTest.AddData)streamAction;
                                                Option queryToUse = Option$.MODULE$.apply((Object)((StreamExecution)this.currentStream$1.elem)).orElse((Function0)new Serializable(this){
                                                    public static final long serialVersionUID = 0L;
                                                    private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                                    public final Option<StreamExecution> apply() {
                                                        return Option$.MODULE$.apply((Object)((StreamExecution)this.$outer.lastStream$1.elem));
                                                    }
                                                    {
                                                        if ($outer == null) {
                                                            throw null;
                                                        }
                                                        this.$outer = $outer;
                                                    }
                                                });
                                                Tuple2<Source, Offset> tuple22 = addData2.addData((Option<StreamExecution>)queryToUse);
                                                if (tuple22 == null) {
                                                    throw new MatchError(tuple22);
                                                }
                                                Source source = (Source)tuple22._1();
                                                Offset offset = (Offset)tuple22._2();
                                                Tuple2 tuple23 = tuple2 = new Tuple2((Object)source, (Object)offset);
                                                Source source2 = (Source)tuple23._1();
                                                Offset offset2 = (Offset)tuple23._2();
                                                int sourceIndex = BoxesRunTime.unboxToInt((Object)queryToUse.flatMap((Function1)new Serializable(this, source2){
                                                    public static final long serialVersionUID = 0L;
                                                    private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                                    private final Source source$1;

                                                    public final Option<Object> apply(StreamExecution query) {
                                                        return this.$outer.org$apache$spark$sql$streaming$StreamTest$class$$anonfun$$findSourceIndex$1(query.logicalPlan(), this.source$1);
                                                    }
                                                    {
                                                        if ($outer == null) {
                                                            throw null;
                                                        }
                                                        this.$outer = $outer;
                                                        this.source$1 = source$1;
                                                    }
                                                }).orElse((Function0)new Serializable(this, source2){
                                                    public static final long serialVersionUID = 0L;
                                                    private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                                    private final Source source$1;

                                                    public final Option<Object> apply() {
                                                        return this.$outer.org$apache$spark$sql$streaming$StreamTest$class$$anonfun$$findSourceIndex$1(this.$outer.stream$1.logicalPlan(), this.source$1);
                                                    }
                                                    {
                                                        if ($outer == null) {
                                                            throw null;
                                                        }
                                                        this.$outer = $outer;
                                                        this.source$1 = source$1;
                                                    }
                                                }).getOrElse((Function0)new Serializable(this){
                                                    public static final long serialVersionUID = 0L;

                                                    public final Nothing$ apply() {
                                                        throw new IllegalArgumentException("Could find index of the source to which data was added");
                                                    }
                                                }));
                                                boxedUnit = this.awaiting$1.put((Object)BoxesRunTime.boxToInteger((int)sourceIndex), (Object)offset2);
                                            }
                                            if (!(streamAction instanceof StreamTest.ExternalAction)) break block34;
                                            StreamTest.ExternalAction externalAction = (StreamTest.ExternalAction)streamAction;
                                            externalAction.runAction();
                                            boxedUnit = BoxedUnit.UNIT;
                                            break block31;
                                        }
                                        if (!(streamAction instanceof StreamTest.CheckAnswerRows)) {
                                            throw new MatchError((Object)streamAction);
                                        }
                                        StreamTest.CheckAnswerRows checkAnswerRows = (StreamTest.CheckAnswerRows)streamAction;
                                        expectedAnswer = checkAnswerRows.expectedAnswer();
                                        lastOnly = checkAnswerRows.lastOnly();
                                        isSorted2 = checkAnswerRows.isSorted();
                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                            public static final long serialVersionUID = 0L;
                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                            public final boolean apply() {
                                                return this.apply$mcZ$sp();
                                            }

                                            public boolean apply$mcZ$sp() {
                                                return (StreamExecution)this.$outer.currentStream$1.elem != null;
                                            }
                                            {
                                                if ($outer == null) {
                                                    throw null;
                                                }
                                                this.$outer = $outer;
                                            }
                                        }, "stream not running", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                        scala.collection.immutable.Map indexToSource = ((TraversableOnce)((TraversableLike)((StreamExecution)this.currentStream$1.elem).logicalPlan().collect((PartialFunction)new Serializable(this){
                                            public static final long serialVersionUID = 0L;

                                            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                                                Object object;
                                                A1 A1 = x2;
                                                if (A1 instanceof StreamingExecutionRelation) {
                                                    StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                                                    Source s = streamingExecutionRelation.source();
                                                    object = s;
                                                } else {
                                                    object = function1.apply(x2);
                                                }
                                                return (B1)object;
                                            }

                                            public final boolean isDefinedAt(LogicalPlan x2) {
                                                LogicalPlan logicalPlan = x2;
                                                boolean bl = logicalPlan instanceof StreamingExecutionRelation;
                                                return bl;
                                            }
                                        }).zipWithIndex(Seq$.MODULE$.canBuildFrom())).map((Function1)new Serializable(this){
                                            public static final long serialVersionUID = 0L;

                                            public final Tuple2<Object, Source> apply(Tuple2<Source, Object> x$9) {
                                                return x$9.swap();
                                            }
                                        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                                        this.awaiting$1.foreach((Function1)new Serializable(this, indexToSource){
                                            public static final long serialVersionUID = 0L;
                                            private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                            public final scala.collection.immutable.Map indexToSource$1;

                                            public final void apply(Tuple2<Object, Offset> x0$2) {
                                                Tuple2<Object, Offset> tuple2 = x0$2;
                                                if (tuple2 != null) {
                                                    int sourceIndex = tuple2._1$mcI$sp();
                                                    Offset offset = (Offset)tuple2._2();
                                                    BoxedUnit boxedUnit = (BoxedUnit)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer().failAfter(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer().streamingTimeout(), (Function0)new Serializable(this, sourceIndex, offset){
                                                        public static final long serialVersionUID = 0L;
                                                        private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.24 $outer;
                                                        private final int sourceIndex$1;
                                                        private final Offset offset$1;

                                                        public final void apply() {
                                                            this.apply$mcV$sp();
                                                        }

                                                        public void apply$mcV$sp() {
                                                            ((StreamExecution)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1.elem).awaitOffset((Source)this.$outer.indexToSource$1.apply((Object)BoxesRunTime.boxToInteger((int)this.sourceIndex$1)), this.offset$1);
                                                        }
                                                        {
                                                            if ($outer == null) {
                                                                throw null;
                                                            }
                                                            this.$outer = $outer;
                                                            this.sourceIndex$1 = sourceIndex$1;
                                                            this.offset$1 = offset$1;
                                                        }
                                                    }, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer().defaultInterruptor());
                                                    return;
                                                }
                                                throw new MatchError(tuple2);
                                            }

                                            public /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer() {
                                                return this.$outer;
                                            }
                                            {
                                                if ($outer == null) {
                                                    throw null;
                                                }
                                                this.$outer = $outer;
                                                this.indexToSource$1 = indexToSource$1;
                                            }
                                        });
                                        try {}
                                        catch (Exception exception) {
                                            throw StreamTest$class.failTest$1(this.$outer, "Exception while getting data from sink", exception, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                        }
                                        break block35;
                                        catch (Throwable throwable) {
                                            Throwable throwable2 = throwable;
                                            Option option = NonFatal$.MODULE$.unapply(throwable2);
                                            if (option.isEmpty()) {
                                                throw throwable;
                                            }
                                            Throwable e = (Throwable)option.get();
                                            throw StreamTest$class.failTest$1(this.$outer, "Error adding data", e, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                        }
                                        catch (Throwable throwable) {
                                            try {
                                                throw StreamTest$class.failTest$1(this.$outer, "Error while checking stream failure", throwable, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                catch (TestFailedDueToTimeoutException testFailedDueToTimeoutException) {
                                                    throw StreamTest$class.failTest$1(this.$outer, "Timed out while waiting for failure", testFailedDueToTimeoutException, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                }
                                                catch (InterruptedException interruptedException) {
                                                    object = BoxedUnit.UNIT;
                                                    break block29;
                                                }
                                            }
                                            finally {
                                                this.lastStream$1.elem = (StreamExecution)this.currentStream$1.elem;
                                                this.currentStream$1.elem = null;
                                            }
                                        }
                                        catch (Throwable throwable) {
                                            try {
                                                throw StreamTest$class.failTest$1(this.$outer, "Error while stopping stream", throwable, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                catch (TestFailedDueToTimeoutException testFailedDueToTimeoutException) {
                                                    throw StreamTest$class.failTest$1(this.$outer, "Timed out while stopping and waiting for microbatchthread to terminate.", testFailedDueToTimeoutException, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                                }
                                                catch (InterruptedException interruptedException) {
                                                    object2 = BoxedUnit.UNIT;
                                                    break block28;
                                                }
                                            }
                                            finally {
                                                this.lastStream$1.elem = (StreamExecution)this.currentStream$1.elem;
                                                this.currentStream$1.elem = null;
                                            }
                                        }
                                        catch (StreamingQueryException streamingQueryException) {
                                            boxedUnit2 = BoxedUnit.UNIT;
                                        }
                                        break block36;
                                    }
                                    Seq sparkAnswer = lastOnly ? this.sink$1.latestBatchData() : this.sink$1.allData();
                                    QueryTest$.MODULE$.sameRows(expectedAnswer, (Seq<Row>)sparkAnswer, isSorted2).foreach((Function1)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;

                                        public final Nothing$ apply(String error) {
                                            return StreamTest$class.failTest$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), error, StreamTest$class.failTest$default$2$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()), this.$outer.pos$1, this.$outer.currentStream$1, this.$outer.sink$1, this.$outer.streamThreadDeathCause$1, this.$outer.startedManually$1, this.$outer.outputMode$1, this.$outer.actions$2);
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    });
                                    boxedUnit = BoxedUnit.UNIT;
                                    break block31;
                                }
                                boxedUnit = boxedUnit2;
                                break block31;
                            }
                            boxedUnit = object2;
                            break block31;
                        }
                        boxedUnit = object;
                    }
                    ++this.pos$1.elem;
                }

                public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                    return this.$outer;
                }

                public final Option org$apache$spark$sql$streaming$StreamTest$class$$anonfun$$findSourceIndex$1(LogicalPlan plan, Source source$1) {
                    return ((IterableLike)plan.collect((PartialFunction)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                            Object object;
                            A1 A1 = x1;
                            if (A1 instanceof StreamingExecutionRelation) {
                                StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                                Source s = streamingExecutionRelation.source();
                                object = s;
                            } else {
                                object = function1.apply(x1);
                            }
                            return (B1)object;
                        }

                        public final boolean isDefinedAt(LogicalPlan x1) {
                            LogicalPlan logicalPlan = x1;
                            boolean bl = logicalPlan instanceof StreamingExecutionRelation;
                            return bl;
                        }
                    }).zipWithIndex(Seq$.MODULE$.canBuildFrom())).find((Function1)new Serializable(this, source$1){
                        public static final long serialVersionUID = 0L;
                        private final Source source$1;

                        public final boolean apply(Tuple2<Source, Object> x$7) {
                            Object object = x$7._1();
                            Source source = this.source$1;
                            return !(object != null ? !object.equals(source) : source != null);
                        }
                        {
                            this.source$1 = source$1;
                        }
                    }).map((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final int apply(Tuple2<Source, Object> x$8) {
                            return x$8._2$mcI$sp();
                        }
                    });
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.stream$1 = stream$1;
                    this.sparkSession$1 = sparkSession$1;
                    this.pos$1 = pos$1;
                    this.currentStream$1 = currentStream$1;
                    this.lastStream$1 = lastStream$1;
                    this.awaiting$1 = awaiting$1;
                    this.sink$1 = sink$1;
                    this.resetConfValues$1 = resetConfValues$1;
                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                    this.startedManually$1 = startedManually$1;
                    this.metadataRoot$1 = metadataRoot$1;
                    this.manualClockExpectedTime$1 = manualClockExpectedTime$1;
                    this.outputMode$1 = outputMode$1;
                    this.actions$2 = actions$2;
                }
            });
            if ((Throwable)streamThreadDeathCause$1.elem != null) break block9;
            {
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    if (throwable2 instanceof InterruptedException && (Throwable)streamThreadDeathCause$1.elem != null) {
                        throw StreamTest$class.failTest$1($this, "Stream Thread Died", (Throwable)streamThreadDeathCause$1.elem, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                    }
                    if (throwable2 instanceof TestFailedDueToTimeoutException) {
                        TestFailedDueToTimeoutException testFailedDueToTimeoutException = (TestFailedDueToTimeoutException)throwable2;
                        throw StreamTest$class.failTest$1($this, "Timed out waiting for stream", (Throwable)testFailedDueToTimeoutException, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                    }
                    throw throwable;
                }
            }
            if ((StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).microBatchThread().isAlive()) {
                ((StreamExecution)currentStream$1.elem).stop();
            }
            resetConfValues$1.foreach((Function1)new Serializable($this, sparkSession$1){
                public static final long serialVersionUID = 0L;
                private final SparkSession sparkSession$1;

                public final void apply(Tuple2<String, Option<String>> x0$3) {
                    Tuple2<String, Option<String>> tuple2;
                    block4: {
                        block3: {
                            block2: {
                                tuple2 = x0$3;
                                if (tuple2 == null) break block2;
                                String key = (String)tuple2._1();
                                Option option = (Option)tuple2._2();
                                if (!(option instanceof Some)) break block2;
                                Some some = (Some)option;
                                String value = (String)some.x();
                                this.sparkSession$1.conf().set(key, value);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block3;
                            }
                            if (tuple2 == null) break block4;
                            String key = (String)tuple2._1();
                            Option option = (Option)tuple2._2();
                            if (!None$.MODULE$.equals(option)) break block4;
                            this.sparkSession$1.conf().unset(key);
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    this.sparkSession$1 = sparkSession$1;
                }
            });
            sparkSession$1.streams().removeListener(listener$1);
            return;
        }
        try {
            throw StreamTest$class.failTest$1($this, "Stream Thread Died", (Throwable)streamThreadDeathCause$1.elem, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
        }
        catch (Throwable throwable) {
            if ((StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).microBatchThread().isAlive()) {
                ((StreamExecution)currentStream$1.elem).stop();
            }
            resetConfValues$1.foreach((Function1)new /* invalid duplicate definition of identical inner class */);
            sparkSession$1.streams().removeListener(listener$1);
            throw throwable;
        }
    }

    public static final ArrayBuffer addCheck$1(StreamTest $this, ExpressionEncoder intEncoder$1, IntRef dataPos$1, ArrayBuffer actions$1) {
        return actions$1.$plus$eq((Object)$this.CheckAnswer().apply(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), dataPos$1.elem), intEncoder$1));
    }

    public static final ArrayBuffer addRandomData$1(StreamTest $this, IntRef dataPos$1, BooleanRef running$1, ArrayBuffer actions$1, Function2 addData$2) {
        int numItems = Random$.MODULE$.nextInt(10);
        Range data = RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(dataPos$1.elem), dataPos$1.elem + numItems);
        dataPos$1.elem += numItems;
        return actions$1.$plus$eq(addData$2.apply((Object)data, (Object)BoxesRunTime.boxToBoolean((boolean)running$1.elem)));
    }

    public static void $init$(StreamTest $this) {
        $this.org$apache$spark$sql$streaming$StreamTest$_setter_$streamingTimeout_$eq(SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds());
    }
}

