package cloudflow.spark;

import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.Cancellable;
import cloudflow.spark.kafka.SparkStreamletContextImpl;
import cloudflow.streamlets.BootstrapInfo$;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.ExceptionAcc;
import cloudflow.streamlets.StreamletDefinition$;
import cloudflow.streamlets.StreamletExecution;
import com.typesafe.config.Config;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: SparkStreamlet.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005udaB\u000b\u0017!\u0003\r\ta\u0007\u0005\u0006Q\u0001!\t!\u000b\u0005\t[\u0001A)\u0019!C\u0001]!9Q\b\u0001b\u0001\n\u000br\u0004bB\"\u0001\u0001\u0004%I\u0001\u0012\u0005\b\u0011\u0002\u0001\r\u0011\"\u0003J\u0011\u001da\u0005A1A\u0005\n5Cqa\u0016\u0001C\u0002\u0013%Q\nC\u0004Y\u0001\t\u0007I\u0011B-\t\u000bu\u0003AQ\u000b0\t\r)\u0004A\u0011\u0001\rl\u0011\u0015y\u0007\u0001b\u0006E\r\u0011\u0001\bAA9\t\u000bydA\u0011A@\t\u000f\u0005\u0015\u0001A\"\u0005\u0002\b!9\u0011q\u0002\u0001\u0005F\u0005E\u0001bBA\u000e\u0001\u0011\u0005\u0013Q\u0004\u0005\b\u0003g\u0001AQAA\u001b\u0011\u001d\ti\u0004\u0001C\u0005\u0003\u007fAq!a\u0013\u0001\t\u0013\ti\u0005C\u0005\u0002l\u0001\u0011\r\u0011\"\u0001\u0002n\tq1\u000b]1sWN#(/Z1nY\u0016$(BA\f\u0019\u0003\u0015\u0019\b/\u0019:l\u0015\u0005I\u0012!C2m_V$g\r\\8x\u0007\u0001\u00192\u0001\u0001\u000f#!\ti\u0002%D\u0001\u001f\u0015\ty\u0002$\u0001\u0006tiJ,\u0017-\u001c7fiNL!!\t\u0010\u0003\u0013M#(/Z1nY\u0016$\bCA\u0012'\u001b\u0005!#\"A\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u001d\"#\u0001D*fe&\fG.\u001b>bE2,\u0017A\u0002\u0013j]&$H\u0005F\u0001+!\t\u00193&\u0003\u0002-I\t!QK\\5u\u0003\rawnZ\u000b\u0002_A\u0011\u0001gN\u0007\u0002c)\u0011!gM\u0001\u0006Y><GG\u001b\u0006\u0003iU\na!\u00199bG\",'\"\u0001\u001c\u0002\u0007=\u0014x-\u0003\u00029c\t1Aj\\4hKJD#A\u0001\u001e\u0011\u0005\rZ\u0014B\u0001\u001f%\u0005%!(/\u00198tS\u0016tG/A\u0004sk:$\u0018.\\3\u0016\u0003}r!\u0001Q!\u000e\u0003YI!A\u0011\f\u0002+M\u0003\u0018M]6TiJ,\u0017-\u001c7fiJ+h\u000e^5nK\u0006\u00191\r\u001e=\u0016\u0003\u0015\u0003\"\u0001\u0011$\n\u0005\u001d3\"!F*qCJ\\7\u000b\u001e:fC6dW\r^\"p]R,\u0007\u0010^\u0001\bGRDx\fJ3r)\tQ#\nC\u0004L\u000b\u0005\u0005\t\u0019A#\u0002\u0007a$\u0013'\u0001\u0007sK\u0006$\u0017\u0010\u0015:p[&\u001cX-F\u0001O!\ry%\u000bV\u0007\u0002!*\u0011\u0011\u000bJ\u0001\u000bG>t7-\u001e:sK:$\u0018BA*Q\u0005\u001d\u0001&o\\7jg\u0016\u0004\"!H+\n\u0005Ys\"a\u0001#v]\u0006\t2m\\7qY\u0016$\u0018n\u001c8Qe>l\u0017n]3\u0002!\r|W\u000e\u001d7fi&|gNR;ukJ,W#\u0001.\u0011\u0007=[F+\u0003\u0002]!\n1a)\u001e;ve\u0016\fQb\u0019:fCR,7i\u001c8uKb$HCA#`\u0011\u0015\u0001\u0017\u00021\u0001b\u0003\u0019\u0019wN\u001c4jOB\u0011!\r[\u0007\u0002G*\u0011\u0001\r\u001a\u0006\u0003K\u001a\f\u0001\u0002^=qKN\fg-\u001a\u0006\u0002O\u0006\u00191m\\7\n\u0005%\u001c'AB\"p]\u001aLw-\u0001\u0006tKR\u001cuN\u001c;fqR$\"\u0001\\7\u0011\u0005\u0001\u0003\u0001\"\u00028\u000b\u0001\u0004)\u0015\u0001E:ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=u\u0003\u001d\u0019wN\u001c;fqR\u0014ad\u00159be.\u001cFO]3b[2,GoQ8oi\u0016DH/\u0012=dKB$\u0018n\u001c8\u0014\u00051\u0011\bCA:|\u001d\t!\u0018P\u0004\u0002vq6\taO\u0003\u0002x5\u00051AH]8pizJ\u0011!J\u0005\u0003u\u0012\nq\u0001]1dW\u0006<W-\u0003\u0002}{\nIQ\t_2faRLwN\u001c\u0006\u0003u\u0012\na\u0001P5oSRtDCAA\u0001!\r\t\u0019\u0001D\u0007\u0002\u0001\u0005Y1M]3bi\u0016dunZ5d)\t\tI\u0001E\u0002A\u0003\u0017I1!!\u0004\u0017\u0005M\u0019\u0006/\u0019:l'R\u0014X-Y7mKRdunZ5d\u0003\r\u0011XO\u001c\u000b\u0005\u0003'\tI\u0002E\u0002\u001e\u0003+I1!a\u0006\u001f\u0005I\u0019FO]3b[2,G/\u0012=fGV$\u0018n\u001c8\t\u000b\u0001|\u0001\u0019A1\u0002+1|wm\u0015;beR\u0014VO\u001c8fe6+7o]1hKR\u0019!&a\b\t\u000f\u0005\u0005\u0002\u00031\u0001\u0002$\u0005I!-^5mI&sgm\u001c\t\u0005\u0003K\tiC\u0004\u0003\u0002(\u0005%\u0002CA;%\u0013\r\tY\u0003J\u0001\u0007!J,G-\u001a4\n\t\u0005=\u0012\u0011\u0007\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005-B%A\bd_:4\u0017nZ;sK\u00124\u0016\r\\;f)\u0019\t\u0019#a\u000e\u0002:!)q.\u0005a\u0001\u000b\"9\u00111H\tA\u0002\u0005\r\u0012!C2p]\u001aLwmS3z\u0003=i\u0017m[3Ta\u0006\u00148nQ8oM&<GCAA!!\u0011\t\u0019%a\u0012\u000e\u0005\u0005\u0015#BA\f4\u0013\u0011\tI%!\u0012\u0003\u0013M\u0003\u0018M]6D_:4\u0017\u0001E7bW\u0016\u001c\u0006/\u0019:l'\u0016\u001c8/[8o)\u0011\ty%a\u001a\u0011\r\u0005E\u0013qKA.\u001b\t\t\u0019FC\u0002\u0002V\u0011\nA!\u001e;jY&!\u0011\u0011LA*\u0005\r!&/\u001f\t\u0005\u0003;\n\u0019'\u0004\u0002\u0002`)!\u0011\u0011MA#\u0003\r\u0019\u0018\u000f\\\u0005\u0005\u0003K\nyF\u0001\u0007Ta\u0006\u00148nU3tg&|g\u000eC\u0004\u0002jM\u0001\r!!\u0011\u0002\u0017M\u0004\u0018M]6D_:4\u0017nZ\u0001\u0010CB\u0004H.[2bi&|gNT1nKV\u0011\u0011q\u000e\t\u0005\u0003c\nY(\u0004\u0002\u0002t)!\u0011QOA<\u0003\u0011a\u0017M\\4\u000b\u0005\u0005e\u0014\u0001\u00026bm\u0006LA!a\f\u0002t\u0001")
/* loaded from: input_file:cloudflow/spark/SparkStreamlet.class */
public interface SparkStreamlet extends Serializable {

    /* compiled from: SparkStreamlet.scala */
    /* loaded from: input_file:cloudflow/spark/SparkStreamlet$SparkStreamletContextException.class */
    public final class SparkStreamletContextException extends Exception {
        public SparkStreamletContextException(SparkStreamlet sparkStreamlet) {
            super("Can only access the SparkStreamletContext within the run() scope");
        }
    }

    void cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$ sparkStreamletRuntime$);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(Future<Dun> future);

    void cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq(String str);

    default Logger log() {
        return Logger.getLogger(getClass().getName());
    }

    SparkStreamletRuntime$ runtime();

    SparkStreamletContext cloudflow$spark$SparkStreamlet$$ctx();

    void cloudflow$spark$SparkStreamlet$$ctx_$eq(SparkStreamletContext sparkStreamletContext);

    Promise<Dun> cloudflow$spark$SparkStreamlet$$readyPromise();

    Promise<Dun> cloudflow$spark$SparkStreamlet$$completionPromise();

    Future<Dun> cloudflow$spark$SparkStreamlet$$completionFuture();

    default SparkStreamletContext createContext(Config config) {
        return (SparkStreamletContext) StreamletDefinition$.MODULE$.read(config, StreamletDefinition$.MODULE$.read$default$2()).flatMap(streamletDefinition -> {
            return this.makeSparkSession(this.makeSparkConfig()).map(sparkSession -> {
                return new SparkStreamletContextImpl(streamletDefinition, sparkSession, streamletDefinition.config().withFallback(config));
            });
        }).recoverWith(new SparkStreamlet$$anonfun$createContext$3(null, config)).get();
    }

    default SparkStreamlet setContext(SparkStreamletContext sparkStreamletContext) {
        cloudflow$spark$SparkStreamlet$$ctx_$eq(sparkStreamletContext);
        return this;
    }

    default SparkStreamletContext context() {
        if (cloudflow$spark$SparkStreamlet$$ctx() == null) {
            throw new SparkStreamletContextException(this);
        }
        return cloudflow$spark$SparkStreamlet$$ctx();
    }

    SparkStreamletLogic createLogic();

    default StreamletExecution run(Config config) {
        if (cloudflow$spark$SparkStreamlet$$ctx() == null) {
            cloudflow$spark$SparkStreamlet$$ctx_$eq(createContext(config));
        }
        final FiniteDuration seconds = new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds();
        final FiniteDuration seconds2 = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
        final ActorSystem apply = ActorSystem$.MODULE$.apply("spark_streamlet", cloudflow$spark$SparkStreamlet$$ctx().config());
        cloudflow$spark$SparkStreamlet$$readyPromise().trySuccess(Dun$.MODULE$);
        final StreamletQueryExecution buildStreamingQueries = createLogic().buildStreamingQueries();
        return new StreamletExecution(this, apply, seconds, seconds2, buildStreamingQueries) { // from class: cloudflow.spark.SparkStreamlet$$anon$1
            private final Future<Dun> readyFuture;
            private final Cancellable done;
            private final /* synthetic */ SparkStreamlet $outer;
            private final ActorSystem system$1;
            private final StreamletQueryExecution streamletQueryExecution$1;

            private Future<Dun> readyFuture() {
                return this.readyFuture;
            }

            private Cancellable done() {
                return this.done;
            }

            public Future<Dun> completed() {
                if (this.streamletQueryExecution$1.queries().forall(streamingQuery -> {
                    return BoxesRunTime.boxToBoolean($anonfun$completed$1(streamingQuery));
                })) {
                    done().cancel();
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().flatMap(streamingQuery2 -> {
                        return streamingQuery2.exception().map(streamingQueryException -> {
                            return streamingQueryException.cause();
                        }).toList();
                    }, Vector$.MODULE$.canBuildFrom());
                    if (vector.nonEmpty()) {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().tryFailure(new ExceptionAcc(vector));
                    } else {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                    }
                }
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public Future<Dun> ready() {
                return readyFuture();
            }

            public Future<Dun> stop() {
                this.streamletQueryExecution$1.stop();
                this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public static final /* synthetic */ boolean $anonfun$done$2(StreamingQuery streamingQuery) {
                return streamingQuery.exception().nonEmpty();
            }

            public static final /* synthetic */ boolean $anonfun$completed$1(StreamingQuery streamingQuery) {
                return !streamingQuery.isActive();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.system$1 = apply;
                this.streamletQueryExecution$1 = buildStreamingQueries;
                this.readyFuture = this.cloudflow$spark$SparkStreamlet$$readyPromise().future();
                this.done = apply.scheduler().schedule(seconds, seconds2, () -> {
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().filter(streamingQuery -> {
                        return BoxesRunTime.boxToBoolean($anonfun$done$2(streamingQuery));
                    });
                    if (vector.nonEmpty()) {
                        this.system$1.log().error(new StringBuilder(42).append("Queries failed. Stopping Process. Reason: ").append(((TraversableOnce) vector.map(streamingQuery2 -> {
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(streamingQuery2.name()), streamingQuery2.status().message());
                        }, Vector$.MODULE$.canBuildFrom())).mkString(", ")).toString());
                        this.streamletQueryExecution$1.stop();
                    }
                }, ExecutionContext$Implicits$.MODULE$.global());
            }
        };
    }

    default void logStartRunnerMessage(String str) {
        log().info(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(60).append("\n      |Initializing Spark Runner ..\n      |\n").append(BootstrapInfo$.MODULE$.box("Build Info")).append("\n      |").append(str).append("\n      ").toString())).stripMargin());
    }

    default String configuredValue(SparkStreamletContext sparkStreamletContext, String str) {
        return sparkStreamletContext.streamletConfig().getString(str);
    }

    private default SparkConf makeSparkConfig() {
        SparkConf sparkConf = new SparkConf();
        return sparkConf.setMaster((String) sparkConf.getOption("spark.master").getOrElse(() -> {
            return "local[2]";
        })).set("spark.sql.shuffle.partitions", "20").set("spark.sql.codegen.wholeStage", "false").set("spark.shuffle.compress", "false");
    }

    private default Try<SparkSession> makeSparkSession(SparkConf sparkConf) {
        return Try$.MODULE$.apply(() -> {
            SparkSession orCreate = SparkSession$.MODULE$.builder().appName(this.applicationName()).config(sparkConf).getOrCreate();
            orCreate.sparkContext().setLogLevel("WARN");
            return orCreate;
        });
    }

    String applicationName();

    static void $init$(SparkStreamlet sparkStreamlet) {
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$.MODULE$);
        sparkStreamlet.cloudflow$spark$SparkStreamlet$$ctx_$eq(null);
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(sparkStreamlet.cloudflow$spark$SparkStreamlet$$completionPromise().future());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq("cloudflow-runner-spark");
    }
}
