package cloudflow.spark;

import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.actor.Scheduler;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.ExceptionAcc;
import cloudflow.streamlets.StreamletExecution;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.Function0;
import scala.Option$;
import scala.Tuple2;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkStreamlet.scala */
/* loaded from: input_file:cloudflow/spark/SparkStreamlet$$anon$1.class */
public final class SparkStreamlet$$anon$1 implements StreamletExecution {
    private final Cancellable scheduledQueryCheck;
    private final Future<Dun> ready;
    private final /* synthetic */ SparkStreamlet $outer;
    private final ActorSystem system$1;
    public final StreamletQueryExecution streamletQueryExecution$1;
    public final Promise completionPromise$1;
    private final Future completionFuture$1;

    private Cancellable scheduledQueryCheck() {
        return this.scheduledQueryCheck;
    }

    public Future<Dun> completed() {
        return this.completionFuture$1;
    }

    public Future<Dun> ready() {
        return this.ready;
    }

    public Future<Dun> stop() {
        this.streamletQueryExecution$1.stop();
        scheduledQueryCheck().cancel();
        return poll(() -> {
            return this.streamletQueryExecution$1.queries().forall(streamingQuery -> {
                return BoxesRunTime.boxToBoolean($anonfun$stop$2(streamingQuery));
            });
        }, new package.DurationInt(package$.MODULE$.DurationInt(1)).second(), this.$outer.StopTimeout(), this.system$1.scheduler()).recoverWith(new SparkStreamlet$$anon$1$$anonfun$stop$3(this), ExecutionContext$Implicits$.MODULE$.global()).map(boxedUnit -> {
            Vector vector = (Vector) this.streamletQueryExecution$1.queries().flatMap(streamingQuery -> {
                return Option$.MODULE$.option2Iterable(streamingQuery.exception().map(streamingQueryException -> {
                    return streamingQueryException.cause();
                }));
            }, Vector$.MODULE$.canBuildFrom());
            if (vector.nonEmpty()) {
                throw new ExceptionAcc(vector);
            }
            return Dun$.MODULE$;
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    private Future<BoxedUnit> poll(Function0<Object> function0, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, Scheduler scheduler) {
        return _poll$1((int) Math.ceil(finiteDuration2.$div(finiteDuration)), function0, finiteDuration2, scheduler, finiteDuration);
    }

    public static final /* synthetic */ boolean $anonfun$stop$2(StreamingQuery streamingQuery) {
        return !streamingQuery.isActive();
    }

    private static final Future _poll$1(int i, Function0 function0, FiniteDuration finiteDuration, Scheduler scheduler, FiniteDuration finiteDuration2) {
        Future future;
        Tuple2.mcZZ.sp spVar = new Tuple2.mcZZ.sp(function0.apply$mcZ$sp(), i <= 0);
        if (spVar != null && true == spVar._1$mcZ$sp()) {
            future = Future$.MODULE$.successful(BoxedUnit.UNIT);
        } else if (spVar == null || true != spVar._2$mcZ$sp()) {
            Promise apply = Promise$.MODULE$.apply();
            scheduler.scheduleOnce(finiteDuration2, () -> {
                apply.completeWith(_poll$1(i - 1, function0, finiteDuration, scheduler, finiteDuration2));
            }, ExecutionContext$Implicits$.MODULE$.global());
            future = apply.future();
        } else {
            future = Future$.MODULE$.failed(new TimeoutException(new StringBuilder(28).append("Poll timed out after ").append(finiteDuration.toMillis()).append(" millis").toString()));
        }
        return future;
    }

    public SparkStreamlet$$anon$1(SparkStreamlet sparkStreamlet, ActorSystem actorSystem, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, StreamletQueryExecution streamletQueryExecution, Promise promise, Future future) {
        if (sparkStreamlet == null) {
            throw null;
        }
        this.$outer = sparkStreamlet;
        this.system$1 = actorSystem;
        this.streamletQueryExecution$1 = streamletQueryExecution;
        this.completionPromise$1 = promise;
        this.completionFuture$1 = future;
        this.scheduledQueryCheck = actorSystem.scheduler().scheduleWithFixedDelay(finiteDuration, finiteDuration2, new Runnable(this) { // from class: cloudflow.spark.SparkStreamlet$$anon$1$$anon$2
            private final /* synthetic */ SparkStreamlet$$anon$1 $outer;

            @Override // java.lang.Runnable
            public void run() {
                if (this.$outer.streamletQueryExecution$1.queries().exists(streamingQuery -> {
                    return BoxesRunTime.boxToBoolean($anonfun$run$1(streamingQuery));
                })) {
                    this.$outer.completionPromise$1.completeWith(this.$outer.stop());
                }
            }

            public static final /* synthetic */ boolean $anonfun$run$1(StreamingQuery streamingQuery) {
                return !streamingQuery.isActive();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, ExecutionContext$Implicits$.MODULE$.global());
        this.ready = Future$.MODULE$.successful(Dun$.MODULE$);
    }
}
