package cloudflow.spark;

import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.Cancellable;
import cloudflow.spark.kafka.SparkStreamletContextImpl;
import cloudflow.streamlets.BootstrapInfo$;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.ExceptionAcc;
import cloudflow.streamlets.StreamletDefinition$;
import cloudflow.streamlets.StreamletExecution;
import com.typesafe.config.Config;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: SparkStreamlet.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015eaB\u000b\u0017!\u0003\r\ta\u0007\u0005\u0006Q\u0001!\t!\u000b\u0005\t[\u0001A)\u0019!C\u0001]!9Q\b\u0001b\u0001\n\u000br\u0004bB\"\u0001\u0001\u0004%I\u0001\u0012\u0005\b\u0019\u0002\u0001\r\u0011\"\u0003N\u0011\u001d\u0001\u0006A1A\u0005\nECqa\u0017\u0001C\u0002\u0013%\u0011\u000bC\u0004]\u0001\t\u0007I\u0011B/\t\u000b\u0005\u0004AQ\u000b2\t\r9\u0004A\u0011\u0001\rp\u0011\u0015\u0019\b\u0001b\u0006E\r\u0011!\bAA;\t\u000f\u0005\u0015A\u0002\"\u0001\u0002\b!9\u0011Q\u0002\u0001\u0007\u0012\u0005=\u0001bBA\f\u0001\u0011\u0015\u0013\u0011\u0004\u0005\b\u0003G\u0001A\u0011IA\u0013\u0011\u001d\tY\u0004\u0001C\u0003\u0003{Aq!!\u0012\u0001\t\u0013\t9\u0005C\u0004\u0002T\u0001!I!!\u0016\t\u0013\u0005M\u0004A1A\u0005\u0002\u0005U$AD*qCJ\\7\u000b\u001e:fC6dW\r\u001e\u0006\u0003/a\tQa\u001d9be.T\u0011!G\u0001\nG2|W\u000f\u001a4m_^\u001c\u0001aE\u0002\u00019\t\u0002\"!\b\u0011\u000e\u0003yQ!a\b\r\u0002\u0015M$(/Z1nY\u0016$8/\u0003\u0002\"=\tI1\u000b\u001e:fC6dW\r\u001e\t\u0003G\u0019j\u0011\u0001\n\u0006\u0002K\u0005)1oY1mC&\u0011q\u0005\n\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.Z\u0001\u0007I%t\u0017\u000e\u001e\u0013\u0015\u0003)\u0002\"aI\u0016\n\u00051\"#\u0001B+oSR\f1\u0001\\8h+\u0005y\u0003C\u0001\u00198\u001b\u0005\t$B\u0001\u001a4\u0003\u0015awn\u001a\u001bk\u0015\t!T'\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002m\u0005\u0019qN]4\n\u0005a\n$A\u0002'pO\u001e,'\u000f\u000b\u0002\u0003uA\u00111eO\u0005\u0003y\u0011\u0012\u0011\u0002\u001e:b]NLWM\u001c;\u0002\u000fI,h\u000e^5nKV\tqH\u0004\u0002A\u00036\ta#\u0003\u0002C-\u0005)2\u000b]1sWN#(/Z1nY\u0016$(+\u001e8uS6,\u0017aA2uqV\tQ\t\u0005\u0002A\r&\u0011qI\u0006\u0002\u0016'B\f'o[*ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=uQ\t!\u0011\n\u0005\u0002$\u0015&\u00111\n\n\u0002\tm>d\u0017\r^5mK\u000691\r\u001e=`I\u0015\fHC\u0001\u0016O\u0011\u001dyU!!AA\u0002\u0015\u000b1\u0001\u001f\u00132\u00031\u0011X-\u00193z!J|W.[:f+\u0005\u0011\u0006cA*W16\tAK\u0003\u0002VI\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005]#&a\u0002)s_6L7/\u001a\t\u0003;eK!A\u0017\u0010\u0003\u0007\u0011+h.A\td_6\u0004H.\u001a;j_:\u0004&o\\7jg\u0016\f\u0001cY8na2,G/[8o\rV$XO]3\u0016\u0003y\u00032aU0Y\u0013\t\u0001GK\u0001\u0004GkR,(/Z\u0001\u000eGJ,\u0017\r^3D_:$X\r\u001f;\u0015\u0005\u0015\u001b\u0007\"\u00023\n\u0001\u0004)\u0017AB2p]\u001aLw\r\u0005\u0002gY6\tqM\u0003\u0002eQ*\u0011\u0011N[\u0001\tif\u0004Xm]1gK*\t1.A\u0002d_6L!!\\4\u0003\r\r{gNZ5h\u0003)\u0019X\r^\"p]R,\u0007\u0010\u001e\u000b\u0003aF\u0004\"\u0001\u0011\u0001\t\u000bIT\u0001\u0019A#\u0002!M$(/Z1nY\u0016$8i\u001c8uKb$\u0018aB2p]R,\u0007\u0010\u001e\u0002\u001f'B\f'o[*ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=u\u000bb\u001cW\r\u001d;j_:\u001c\"\u0001\u0004<\u0011\u0005]|hB\u0001=~\u001d\tIH0D\u0001{\u0015\tY($\u0001\u0004=e>|GOP\u0005\u0002K%\u0011a\u0010J\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\t!a\u0001\u0003\u0013\u0015C8-\u001a9uS>t'B\u0001@%\u0003\u0019a\u0014N\\5u}Q\u0011\u0011\u0011\u0002\t\u0004\u0003\u0017aQ\"\u0001\u0001\u0002\u0017\r\u0014X-\u0019;f\u0019><\u0017n\u0019\u000b\u0003\u0003#\u00012\u0001QA\n\u0013\r\t)B\u0006\u0002\u0014'B\f'o[*ue\u0016\fW\u000e\\3u\u0019><\u0017nY\u0001\u0004eVtG\u0003BA\u000e\u0003C\u00012!HA\u000f\u0013\r\tyB\b\u0002\u0013'R\u0014X-Y7mKR,\u00050Z2vi&|g\u000eC\u0003e\u001f\u0001\u0007Q-A\u000bm_\u001e\u001cF/\u0019:u%Vtg.\u001a:NKN\u001c\u0018mZ3\u0015\u0007)\n9\u0003C\u0004\u0002*A\u0001\r!a\u000b\u0002\u0013\t,\u0018\u000e\u001c3J]\u001a|\u0007\u0003BA\u0017\u0003kqA!a\f\u00022A\u0011\u0011\u0010J\u0005\u0004\u0003g!\u0013A\u0002)sK\u0012,g-\u0003\u0003\u00028\u0005e\"AB*ue&twMC\u0002\u00024\u0011\nqbY8oM&<WO]3e-\u0006dW/\u001a\u000b\u0007\u0003W\ty$!\u0011\t\u000bM\f\u0002\u0019A#\t\u000f\u0005\r\u0013\u00031\u0001\u0002,\u0005I1m\u001c8gS\u001e\\U-_\u0001\u0010[\u0006\\Wm\u00159be.\u001cuN\u001c4jOR\u0011\u0011\u0011\n\t\u0005\u0003\u0017\ny%\u0004\u0002\u0002N)\u0011qcM\u0005\u0005\u0003#\niEA\u0005Ta\u0006\u00148nQ8oM\u0006\u0001R.Y6f'B\f'o[*fgNLwN\u001c\u000b\u0005\u0003/\ny\u0007\u0005\u0004\u0002Z\u0005}\u00131M\u0007\u0003\u00037R1!!\u0018%\u0003\u0011)H/\u001b7\n\t\u0005\u0005\u00141\f\u0002\u0004)JL\b\u0003BA3\u0003Wj!!a\u001a\u000b\t\u0005%\u0014QJ\u0001\u0004gFd\u0017\u0002BA7\u0003O\u0012Ab\u00159be.\u001cVm]:j_:Dq!!\u001d\u0014\u0001\u0004\tI%A\u0006ta\u0006\u00148nQ8oM&<\u0017aD1qa2L7-\u0019;j_:t\u0015-\\3\u0016\u0005\u0005]\u0004\u0003BA=\u0003\u0007k!!a\u001f\u000b\t\u0005u\u0014qP\u0001\u0005Y\u0006twM\u0003\u0002\u0002\u0002\u0006!!.\u0019<b\u0013\u0011\t9$a\u001f")
/* loaded from: input_file:cloudflow/spark/SparkStreamlet.class */
public interface SparkStreamlet extends Serializable {

    /* compiled from: SparkStreamlet.scala */
    /* loaded from: input_file:cloudflow/spark/SparkStreamlet$SparkStreamletContextException.class */
    public final class SparkStreamletContextException extends Exception {
        public SparkStreamletContextException(SparkStreamlet sparkStreamlet) {
            super("Can only access the SparkStreamletContext within the run() scope");
        }
    }

    void cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$ sparkStreamletRuntime$);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise<Dun> promise);

    void cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(Future<Dun> future);

    void cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq(String str);

    default Logger log() {
        return Logger.getLogger(getClass().getName());
    }

    SparkStreamletRuntime$ runtime();

    SparkStreamletContext cloudflow$spark$SparkStreamlet$$ctx();

    void cloudflow$spark$SparkStreamlet$$ctx_$eq(SparkStreamletContext sparkStreamletContext);

    Promise<Dun> cloudflow$spark$SparkStreamlet$$readyPromise();

    Promise<Dun> cloudflow$spark$SparkStreamlet$$completionPromise();

    Future<Dun> cloudflow$spark$SparkStreamlet$$completionFuture();

    default SparkStreamletContext createContext(Config config) {
        return (SparkStreamletContext) StreamletDefinition$.MODULE$.read(config, StreamletDefinition$.MODULE$.read$default$2()).flatMap(streamletDefinition -> {
            return this.makeSparkSession(this.makeSparkConfig()).map(sparkSession -> {
                return new SparkStreamletContextImpl(streamletDefinition, sparkSession, streamletDefinition.config().withFallback(config));
            });
        }).recoverWith(new SparkStreamlet$$anonfun$createContext$3(null, config)).get();
    }

    default SparkStreamlet setContext(SparkStreamletContext sparkStreamletContext) {
        cloudflow$spark$SparkStreamlet$$ctx_$eq(sparkStreamletContext);
        return this;
    }

    default SparkStreamletContext context() {
        if (cloudflow$spark$SparkStreamlet$$ctx() == null) {
            throw new SparkStreamletContextException(this);
        }
        return cloudflow$spark$SparkStreamlet$$ctx();
    }

    SparkStreamletLogic createLogic();

    default StreamletExecution run(Config config) {
        if (cloudflow$spark$SparkStreamlet$$ctx() == null) {
            cloudflow$spark$SparkStreamlet$$ctx_$eq(createContext(config));
        }
        final FiniteDuration seconds = new package.DurationInt(package$.MODULE$.DurationInt(2)).seconds();
        final FiniteDuration seconds2 = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
        final ActorSystem apply = ActorSystem$.MODULE$.apply("spark_streamlet", cloudflow$spark$SparkStreamlet$$ctx().config());
        cloudflow$spark$SparkStreamlet$$readyPromise().trySuccess(Dun$.MODULE$);
        final StreamletQueryExecution buildStreamingQueries = createLogic().buildStreamingQueries();
        return new StreamletExecution(this, apply, seconds, seconds2, buildStreamingQueries) { // from class: cloudflow.spark.SparkStreamlet$$anon$1
            private final Future<Dun> readyFuture;
            private final Cancellable done;
            private final /* synthetic */ SparkStreamlet $outer;
            private final ActorSystem system$1;
            private final StreamletQueryExecution streamletQueryExecution$1;

            private Future<Dun> readyFuture() {
                return this.readyFuture;
            }

            private Cancellable done() {
                return this.done;
            }

            public Future<Dun> completed() {
                if (this.streamletQueryExecution$1.queries().forall(streamingQuery -> {
                    return BoxesRunTime.boxToBoolean($anonfun$completed$1(streamingQuery));
                })) {
                    done().cancel();
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().flatMap(streamingQuery2 -> {
                        return streamingQuery2.exception().map(streamingQueryException -> {
                            return streamingQueryException.cause();
                        }).toList();
                    }, Vector$.MODULE$.canBuildFrom());
                    if (vector.nonEmpty()) {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().tryFailure(new ExceptionAcc(vector));
                    } else {
                        this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                    }
                }
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public Future<Dun> ready() {
                return readyFuture();
            }

            public Future<Dun> stop() {
                this.streamletQueryExecution$1.stop();
                this.$outer.cloudflow$spark$SparkStreamlet$$completionPromise().trySuccess(Dun$.MODULE$);
                return this.$outer.cloudflow$spark$SparkStreamlet$$completionFuture();
            }

            public static final /* synthetic */ boolean $anonfun$done$2(StreamingQuery streamingQuery) {
                return streamingQuery.exception().nonEmpty();
            }

            public static final /* synthetic */ boolean $anonfun$completed$1(StreamingQuery streamingQuery) {
                return !streamingQuery.isActive();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.system$1 = apply;
                this.streamletQueryExecution$1 = buildStreamingQueries;
                this.readyFuture = this.cloudflow$spark$SparkStreamlet$$readyPromise().future();
                this.done = apply.scheduler().schedule(seconds, seconds2, () -> {
                    Vector vector = (Vector) this.streamletQueryExecution$1.queries().filter(streamingQuery -> {
                        return BoxesRunTime.boxToBoolean($anonfun$done$2(streamingQuery));
                    });
                    if (vector.nonEmpty()) {
                        this.system$1.log().error(new StringBuilder(42).append("Queries failed. Stopping Process. Reason: ").append(((TraversableOnce) vector.map(streamingQuery2 -> {
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(streamingQuery2.name()), streamingQuery2.status().message());
                        }, Vector$.MODULE$.canBuildFrom())).mkString(", ")).toString());
                        this.streamletQueryExecution$1.stop();
                    }
                }, ExecutionContext$Implicits$.MODULE$.global());
            }
        };
    }

    default void logStartRunnerMessage(String str) {
        log().info(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(60).append("\n      |Initializing Spark Runner ..\n      |\n").append(BootstrapInfo$.MODULE$.box("Build Info")).append("\n      |").append(str).append("\n      ").toString())).stripMargin());
    }

    default String configuredValue(SparkStreamletContext sparkStreamletContext, String str) {
        return sparkStreamletContext.streamletConfig().getString(str);
    }

    private default SparkConf makeSparkConfig() {
        SparkConf sparkConf = new SparkConf();
        return sparkConf.setMaster((String) sparkConf.getOption("spark.master").getOrElse(() -> {
            return "local[2]";
        })).set("spark.sql.shuffle.partitions", "20").set("spark.sql.codegen.wholeStage", "false").set("spark.shuffle.compress", "false");
    }

    private default Try<SparkSession> makeSparkSession(SparkConf sparkConf) {
        return Try$.MODULE$.apply(() -> {
            SparkSession orCreate = SparkSession$.MODULE$.builder().appName(this.applicationName()).config(sparkConf).getOrCreate();
            orCreate.sparkContext().setLogLevel("WARN");
            return orCreate;
        });
    }

    String applicationName();

    static void $init$(SparkStreamlet sparkStreamlet) {
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$runtime_$eq(SparkStreamletRuntime$.MODULE$);
        sparkStreamlet.cloudflow$spark$SparkStreamlet$$ctx_$eq(null);
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$readyPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionPromise_$eq(Promise$.MODULE$.apply());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$cloudflow$spark$SparkStreamlet$$completionFuture_$eq(sparkStreamlet.cloudflow$spark$SparkStreamlet$$completionPromise().future());
        sparkStreamlet.cloudflow$spark$SparkStreamlet$_setter_$applicationName_$eq("cloudflow-runner-spark");
    }
}
