package cloudflow.spark;

import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.Cancellable;
import cloudflow.spark.kafka.SparkStreamletContextImpl;
import cloudflow.streamlets.BootstrapInfo$;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.ExceptionAcc;
import cloudflow.streamlets.StreamletContextException;
import cloudflow.streamlets.StreamletDefinition$;
import cloudflow.streamlets.StreamletExecution;
import com.typesafe.config.Config;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.slf4j.Logger;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: SparkStreamlet.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=da\u0002\u000b\u0016!\u0003\r\tA\u0007\u0005\u0006K\u0001!\tA\n\u0005\b[\u0001\u0011\r\u0011\"\u0012/\u0011\u001d\u0011\u0004A1A\u0005\nMBq!\u0010\u0001C\u0002\u0013%1\u0007C\u0004?\u0001\t\u0007I\u0011B \t\u000b\r\u0003AQ\u000b#\t\rA\u0003A\u0011A\fR\r\u0011)\u0006A\u0001,\t\u000biCA\u0011A.\t\u000by\u0003a\u0011C0\t\u000b\r\u0004AQ\t3\t\u000b%\u0004A\u0011\t6\t\u000ba\u0004AQA=\t\u000by\u0004A\u0011B@\t\u000f\u0005M\u0001\u0001\"\u0003\u0002\u0016!I\u00111\u0007\u0001C\u0002\u0013\u0005\u0011Q\u0007\u0005\u000f\u0003\u000b\u0002\u0001\u0013aA\u0001\u0002\u0013%\u0011qIA'\u00119\t\t\u0006\u0001I\u0001\u0004\u0003\u0005I\u0011BA*\u0003/Ba\"a\u0017\u0001!\u0003\r\t\u0011!C\u0005\u0003;\nYG\u0001\bTa\u0006\u00148n\u0015;sK\u0006lG.\u001a;\u000b\u0005Y9\u0012!B:qCJ\\'\"\u0001\r\u0002\u0013\rdw.\u001e3gY><8\u0001A\n\u0003\u0001m\u00012\u0001H\u0010\"\u001b\u0005i\"B\u0001\u0010\u0018\u0003)\u0019HO]3b[2,Go]\u0005\u0003Au\u0011\u0011b\u0015;sK\u0006lG.\u001a;\u0011\u0005\t\u001aS\"A\u000b\n\u0005\u0011*\"!F*qCJ\\7\u000b\u001e:fC6dW\r^\"p]R,\u0007\u0010^\u0001\u0007I%t\u0017\u000e\u001e\u0013\u0015\u0003\u001d\u0002\"\u0001K\u0016\u000e\u0003%R\u0011AK\u0001\u0006g\u000e\fG.Y\u0005\u0003Y%\u0012A!\u00168ji\u00069!/\u001e8uS6,W#A\u0018\u000f\u0005\t\u0002\u0014BA\u0019\u0016\u0003U\u0019\u0006/\u0019:l'R\u0014X-Y7mKR\u0014VO\u001c;j[\u0016\fAB]3bIf\u0004&o\\7jg\u0016,\u0012\u0001\u000e\t\u0004kaRT\"\u0001\u001c\u000b\u0005]J\u0013AC2p]\u000e,(O]3oi&\u0011\u0011H\u000e\u0002\b!J|W.[:f!\ta2(\u0003\u0002=;\t\u0019A)\u001e8\u0002#\r|W\u000e\u001d7fi&|g\u000e\u0015:p[&\u001cX-\u0001\td_6\u0004H.\u001a;j_:4U\u000f^;sKV\t\u0001\tE\u00026\u0003jJ!A\u0011\u001c\u0003\r\u0019+H/\u001e:f\u00035\u0019'/Z1uK\u000e{g\u000e^3yiR\u0011\u0011%\u0012\u0005\u0006\r\u001a\u0001\raR\u0001\u0007G>tg-[4\u0011\u0005!sU\"A%\u000b\u0005\u0019S%BA&M\u0003!!\u0018\u0010]3tC\u001a,'\"A'\u0002\u0007\r|W.\u0003\u0002P\u0013\n11i\u001c8gS\u001e\f!b]3u\u0007>tG/\u001a=u)\t\u00116\u000b\u0005\u0002#\u0001!)Ak\u0002a\u0001C\u0005\u00012\u000f\u001e:fC6dW\r^\"p]R,\u0007\u0010\u001e\u0002\u001f'B\f'o[*ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=u\u000bb\u001cW\r\u001d;j_:\u001c\"\u0001C,\u0011\u0005qA\u0016BA-\u001e\u0005e\u0019FO]3b[2,GoQ8oi\u0016DH/\u0012=dKB$\u0018n\u001c8\u0002\rqJg.\u001b;?)\u0005a\u0006CA/\t\u001b\u0005\u0001\u0011aC2sK\u0006$X\rT8hS\u000e$\u0012\u0001\u0019\t\u0003E\u0005L!AY\u000b\u0003'M\u0003\u0018M]6TiJ,\u0017-\u001c7fi2{w-[2\u0002\u0007I,h\u000e\u0006\u0002fQB\u0011ADZ\u0005\u0003Ov\u0011!c\u0015;sK\u0006lG.\u001a;Fq\u0016\u001cW\u000f^5p]\")ai\u0003a\u0001\u000f\u0006)Bn\\4Ti\u0006\u0014HOU;o]\u0016\u0014X*Z:tC\u001e,GCA\u0014l\u0011\u0015aG\u00021\u0001n\u0003%\u0011W/\u001b7e\u0013:4w\u000e\u0005\u0002ok:\u0011qn\u001d\t\u0003a&j\u0011!\u001d\u0006\u0003ef\ta\u0001\u0010:p_Rt\u0014B\u0001;*\u0003\u0019\u0001&/\u001a3fM&\u0011ao\u001e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005QL\u0013aD2p]\u001aLw-\u001e:fIZ\u000bG.^3\u0015\u00075TH\u0010C\u0003|\u001b\u0001\u0007\u0011%A\u0004d_:$X\r\u001f;\t\u000bul\u0001\u0019A7\u0002\u0013\r|gNZ5h\u0017\u0016L\u0018aD7bW\u0016\u001c\u0006/\u0019:l\u0007>tg-[4\u0015\u0005\u0005\u0005\u0001\u0003BA\u0002\u0003\u001fi!!!\u0002\u000b\u0007Y\t9A\u0003\u0003\u0002\n\u0005-\u0011AB1qC\u000eDWM\u0003\u0002\u0002\u000e\u0005\u0019qN]4\n\t\u0005E\u0011Q\u0001\u0002\n'B\f'o[\"p]\u001a\f\u0001#\\1lKN\u0003\u0018M]6TKN\u001c\u0018n\u001c8\u0015\t\u0005]\u0011q\u0006\t\u0007\u00033\ty\"a\t\u000e\u0005\u0005m!bAA\u000fS\u0005!Q\u000f^5m\u0013\u0011\t\t#a\u0007\u0003\u0007Q\u0013\u0018\u0010\u0005\u0003\u0002&\u0005-RBAA\u0014\u0015\u0011\tI#!\u0002\u0002\u0007M\fH.\u0003\u0003\u0002.\u0005\u001d\"\u0001D*qCJ\\7+Z:tS>t\u0007bBA\u0019\u001f\u0001\u0007\u0011\u0011A\u0001\fgB\f'o[\"p]\u001aLw-A\bbaBd\u0017nY1uS>tg*Y7f+\t\t9\u0004\u0005\u0003\u0002:\u0005\rSBAA\u001e\u0015\u0011\ti$a\u0010\u0002\t1\fgn\u001a\u0006\u0003\u0003\u0003\nAA[1wC&\u0019a/a\u000f\u0002\u001bM,\b/\u001a:%GRDx\fJ3r)\r9\u0013\u0011\n\u0005\t\u0003\u0017\n\u0012\u0011!a\u0001C\u0005\u0019\u0001\u0010J\u0019\n\u0007\u0005=s$A\u0004dib|F%Z9\u00021M,\b/\u001a:%O\u0016$xJ]\"sK\u0006$XmQ8oi\u0016DH\u000fF\u0002\"\u0003+BQA\u0012\nA\u0002\u001dK1!!\u0017 \u0003I9W\r^(s\u0007J,\u0017\r^3D_:$X\r\u001f;\u0002\u0013M,\b/\u001a:%Y><WCAA0!\u0011\t\t'a\u001a\u000e\u0005\u0005\r$\u0002BA3\u0003\u0017\tQa\u001d7gi)LA!!\u001b\u0002d\t1Aj\\4hKJL1!!\u001c \u0003\rawn\u001a")
/* loaded from: input_file:cloudflow/spark/SparkStreamlet.class */
public interface SparkStreamlet {

    /* compiled from: SparkStreamlet.scala */
    /* loaded from: input_file:cloudflow/spark/SparkStreamlet$SparkStreamletContextException.class */
    public final class SparkStreamletContextException extends StreamletContextException {
        public SparkStreamletContextException(SparkStreamlet sparkStreamlet) {
            super("The SparkStreamletContext can only be accessed from within the streamlet logic.");
        }
    }

    void cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$ sparkStreamletRuntime$);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(Future<Dun> future);

    void cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq(String str);

    /* synthetic */ void cloudflow$spark$SparkStreamlet$$super$ctx_$eq(SparkStreamletContext sparkStreamletContext);

    /* synthetic */ SparkStreamletContext cloudflow$spark$SparkStreamlet$$super$getOrCreateContext(Config config);

    /* synthetic */ Logger cloudflow$spark$SparkStreamlet$$super$log();

    SparkStreamletRuntime$ runtime();

    Promise<Dun> cloudflow$spark$SparkStreamlet$$readyPromise();

    Promise<Dun> cloudflow$spark$SparkStreamlet$$completionPromise();

    Future<Dun> cloudflow$spark$SparkStreamlet$$completionFuture();

    default SparkStreamletContext createContext(Config config) {
        return (SparkStreamletContext) StreamletDefinition$.MODULE$.read(config, StreamletDefinition$.MODULE$.read$default$2()).flatMap(streamletDefinition -> {
            return this.makeSparkSession(this.makeSparkConfig()).map(sparkSession -> {
                return new SparkStreamletContextImpl(streamletDefinition, sparkSession, streamletDefinition.config().withFallback(config));
            });
        }).recoverWith(new SparkStreamlet$$anonfun$createContext$3(null, config)).get();
    }

    default SparkStreamlet setContext(SparkStreamletContext sparkStreamletContext) {
        cloudflow$spark$SparkStreamlet$$super$ctx_$eq(sparkStreamletContext);
        return this;
    }

    SparkStreamletLogic createLogic();

    default StreamletExecution run(Config config) {
        SparkStreamletContext cloudflow$spark$SparkStreamlet$$super$getOrCreateContext = cloudflow$spark$SparkStreamlet$$super$getOrCreateContext(config);
        final FiniteDuration seconds = new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds();
        final FiniteDuration seconds2 = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
        final ActorSystem apply = ActorSystem$.MODULE$.apply("spark_streamlet", cloudflow$spark$SparkStreamlet$$super$getOrCreateContext.config());
        cloudflow$spark$SparkStreamlet$$readyPromise().trySuccess(Dun$.MODULE$);
        final StreamletQueryExecution buildStreamingQueries = createLogic().buildStreamingQueries();
        return new StreamletExecution(this, apply, seconds, seconds2, buildStreamingQueries) { // from class: cloudflow.spark.SparkStreamlet$$anon$1
            private final Future<Dun> readyFuture;
            private final Cancellable done;
            private final /* synthetic */ SparkStreamlet $outer;
            private final ActorSystem system$1;
            private final StreamletQueryExecution streamletQueryExecution$1;

            private Future<Dun> readyFuture() {
                return this.readyFuture;
            }

            private Cancellable done() {
                return this.done;
            }

            public Future<Dun> completed() {
                if (this.streamletQueryExecution$1.queries().forall(streamingQuery -> {
                    return BoxesRunTime.boxToBoolean($anonfun$completed$1(streamingQuery));
                })) {
                    done().cancel();
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().flatMap(streamingQuery2 -> {
                        return streamingQuery2.exception().map(streamingQueryException -> {
                            return streamingQueryException.cause();
                        }).toList();
                    }, Vector$.MODULE$.canBuildFrom());
                    if (vector.nonEmpty()) {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().tryFailure(new ExceptionAcc(vector));
                    } else {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                    }
                }
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public Future<Dun> ready() {
                return readyFuture();
            }

            public Future<Dun> stop() {
                this.streamletQueryExecution$1.stop();
                this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public static final /* synthetic */ boolean $anonfun$done$2(StreamingQuery streamingQuery) {
                return streamingQuery.exception().nonEmpty();
            }

            public static final /* synthetic */ boolean $anonfun$completed$1(StreamingQuery streamingQuery) {
                return !streamingQuery.isActive();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.system$1 = apply;
                this.streamletQueryExecution$1 = buildStreamingQueries;
                this.readyFuture = this.cloudflow$spark$SparkStreamlet$$readyPromise().future();
                this.done = apply.scheduler().schedule(seconds, seconds2, () -> {
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().filter(streamingQuery -> {
                        return BoxesRunTime.boxToBoolean($anonfun$done$2(streamingQuery));
                    });
                    if (vector.nonEmpty()) {
                        this.system$1.log().error(new StringBuilder(42).append("Queries failed. Stopping Process. Reason: ").append(((TraversableOnce) vector.map(streamingQuery2 -> {
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(streamingQuery2.name()), streamingQuery2.status().message());
                        }, Vector$.MODULE$.canBuildFrom())).mkString(", ")).toString());
                        this.streamletQueryExecution$1.stop();
                    }
                }, ExecutionContext$Implicits$.MODULE$.global());
            }
        };
    }

    default void logStartRunnerMessage(String str) {
        cloudflow$spark$SparkStreamlet$$super$log().info(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(60).append("\n      |Initializing Spark Runner ..\n      |\n").append(BootstrapInfo$.MODULE$.box("Build Info")).append("\n      |").append(str).append("\n      ").toString())).stripMargin());
    }

    default String configuredValue(SparkStreamletContext sparkStreamletContext, String str) {
        return sparkStreamletContext.streamletConfig().getString(str);
    }

    private default SparkConf makeSparkConfig() {
        SparkConf sparkConf = new SparkConf();
        return sparkConf.setMaster((String) sparkConf.getOption("spark.master").getOrElse(() -> {
            return "local[2]";
        })).set("spark.sql.shuffle.partitions", "20").set("spark.sql.codegen.wholeStage", "false").set("spark.shuffle.compress", "false");
    }

    private default Try<SparkSession> makeSparkSession(SparkConf sparkConf) {
        return Try$.MODULE$.apply(() -> {
            SparkSession orCreate = SparkSession$.MODULE$.builder().appName(this.applicationName()).config(sparkConf).getOrCreate();
            orCreate.sparkContext().setLogLevel("WARN");
            return orCreate;
        });
    }

    String applicationName();

    static void $init$(SparkStreamlet sparkStreamlet) {
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$.MODULE$);
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(sparkStreamlet.cloudflow$spark$SparkStreamlet$$completionPromise().future());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq("cloudflow-runner-spark");
    }
}
