package cloudflow.spark.kafka;

import cloudflow.spark.SparkStreamletContext;
import cloudflow.spark.sql.SQLImplicits$;
import cloudflow.streamlets.CodecInlet;
import cloudflow.streamlets.CodecOutlet;
import cloudflow.streamlets.RoundRobinPartitioner$;
import cloudflow.streamlets.StreamletDefinition;
import cloudflow.streamlets.Topic;
import com.typesafe.config.Config;
import java.io.File;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: SparkStreamletContextImpl.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015h\u0001\u0002\n\u0014\u0001iA\u0011b\b\u0001\u0003\u0006\u0004%\te\u0006\u0011\t\u0013\u001d\u0002!\u0011!Q\u0001\n\u0005B\u0003\"C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u00166\u0011!1\u0004A!b\u0001\n\u0003:\u0004\u0002C!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001d\t\u000b\t\u0003A\u0011A\"\t\u000f%\u0003!\u0019!C\u0001\u0015\"11\u000b\u0001Q\u0001\n-Cq\u0001\u0016\u0001C\u0002\u0013\u0005Q\u000b\u0003\u0004]\u0001\u0001\u0006IA\u0016\u0005\u0006;\u0002!\tA\u0018\u0005\b\u0003[\u0001A\u0011BA\u0018\u0011\u001d\tI\u0006\u0001C\u0005\u00037Bq!a\u0018\u0001\t\u0013\t\t\u0007C\u0004\u0002v\u0001!\t!a\u001e\t\u0013\u0005\u0005\u0007!%A\u0005\u0002\u0005\r\u0007bBAo\u0001\u0011\u0005\u0011q\u001c\u0002\u001a'B\f'o[*ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=u\u00136\u0004HN\u0003\u0002\u0015+\u0005)1.\u00194lC*\u0011acF\u0001\u0006gB\f'o\u001b\u0006\u00021\u0005I1\r\\8vI\u001adwn^\u0002\u0001'\t\u00011\u0004\u0005\u0002\u001d;5\tQ#\u0003\u0002\u001f+\t)2\u000b]1sWN#(/Z1nY\u0016$8i\u001c8uKb$\u0018aE:ue\u0016\fW\u000e\\3u\t\u00164\u0017N\\5uS>tW#A\u0011\u0011\u0005\t*S\"A\u0012\u000b\u0005\u0011:\u0012AC:ue\u0016\fW\u000e\\3ug&\u0011ae\t\u0002\u0014'R\u0014X-Y7mKR$UMZ5oSRLwN\\\u0001\u0015gR\u0014X-Y7mKR$UMZ5oSRLwN\u001c\u0011\n\u0005}i\u0012aB:fgNLwN\u001c\t\u0003WMj\u0011\u0001\f\u0006\u0003[9\n1a]9m\u0015\t1rF\u0003\u00021c\u00051\u0011\r]1dQ\u0016T\u0011AM\u0001\u0004_J<\u0017B\u0001\u001b-\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0013\tIS$\u0001\u0004d_:4\u0017nZ\u000b\u0002qA\u0011\u0011hP\u0007\u0002u)\u0011ag\u000f\u0006\u0003yu\n\u0001\u0002^=qKN\fg-\u001a\u0006\u0002}\u0005\u00191m\\7\n\u0005\u0001S$AB\"p]\u001aLw-A\u0004d_:4\u0017n\u001a\u0011\u0002\rqJg.\u001b;?)\u0011!ei\u0012%\u0011\u0005\u0015\u0003Q\"A\n\t\u000b}1\u0001\u0019A\u0011\t\u000b%2\u0001\u0019\u0001\u0016\t\u000bY2\u0001\u0019\u0001\u001d\u0002\u0015M$xN]1hK\u0012K'/F\u0001L!\ta\u0015+D\u0001N\u0015\tqu*\u0001\u0003mC:<'\"\u0001)\u0002\t)\fg/Y\u0005\u0003%6\u0013aa\u0015;sS:<\u0017aC:u_J\fw-\u001a#je\u0002\nA#\\1y\u001f\u001a47/\u001a;t!\u0016\u0014HK]5hO\u0016\u0014X#\u0001,\u0011\u0005]SV\"\u0001-\u000b\u0003e\u000bQa]2bY\u0006L!a\u0017-\u0003\t1{gnZ\u0001\u0016[\u0006DxJ\u001a4tKR\u001c\b+\u001a:Ue&<w-\u001a:!\u0003)\u0011X-\u00193TiJ,\u0017-\\\u000b\u0003?\u001a$2\u0001YA\u0012)\r\tw\u000e\u001e\t\u0004W\t$\u0017BA2-\u0005\u001d!\u0015\r^1tKR\u0004\"!\u001a4\r\u0001\u0011)qm\u0003b\u0001Q\n\u0011\u0011J\\\t\u0003S2\u0004\"a\u00166\n\u0005-D&a\u0002(pi\"Lgn\u001a\t\u0003/6L!A\u001c-\u0003\u0007\u0005s\u0017\u0010C\u0003q\u0017\u0001\u000f\u0011/A\u0004f]\u000e|G-\u001a:\u0011\u0007-\u0012H-\u0003\u0002tY\t9QI\\2pI\u0016\u0014\b\"B;\f\u0001\b1\u0018a\u0002;za\u0016$\u0016m\u001a\t\u0005o\u0006]AMD\u0002y\u0003#q1!_A\u0006\u001d\rQ\u0018Q\u0001\b\u0004w\u0006\u0005aB\u0001?��\u001b\u0005i(B\u0001@\u001a\u0003\u0019a$o\\8u}%\t\u0011,C\u0002\u0002\u0004a\u000bqA]3gY\u0016\u001cG/\u0003\u0003\u0002\b\u0005%\u0011a\u0002:v]RLW.\u001a\u0006\u0004\u0003\u0007A\u0016\u0002BA\u0007\u0003\u001f\tq\u0001]1dW\u0006<WM\u0003\u0003\u0002\b\u0005%\u0011\u0002BA\n\u0003+\t\u0001\"\u001e8jm\u0016\u00148/\u001a\u0006\u0005\u0003\u001b\ty!\u0003\u0003\u0002\u001a\u0005m!a\u0002+za\u0016$\u0016mZ\u0005\u0005\u0003;\tyB\u0001\u0005UsB,G+Y4t\u0015\u0011\t\t#!\u0003\u0002\u0007\u0005\u0004\u0018\u000eC\u0004\u0002&-\u0001\r!a\n\u0002\r%t\u0007k\u001c:u!\u0011\u0011\u0013\u0011\u00063\n\u0007\u0005-2E\u0001\u0006D_\u0012,7-\u00138mKR\f\u0001c[1gW\u0006\u001cuN\\:v[\u0016\u0014X*\u00199\u0015\t\u0005E\u0012q\n\t\t\u0003g\ti$!\u0011\u0002B5\u0011\u0011Q\u0007\u0006\u0005\u0003o\tI$A\u0005j[6,H/\u00192mK*\u0019\u00111\b-\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002@\u0005U\"aA'baB!\u00111IA&\u001d\u0011\t)%a\u0012\u0011\u0005qD\u0016bAA%1\u00061\u0001K]3eK\u001aL1AUA'\u0015\r\tI\u0005\u0017\u0005\b\u0003#b\u0001\u0019AA*\u0003\u0015!x\u000e]5d!\r\u0011\u0013QK\u0005\u0004\u0003/\u001a#!\u0002+pa&\u001c\u0017\u0001E6bM.\f\u0007K]8ek\u000e,'/T1q)\u0011\t\t$!\u0018\t\u000f\u0005ES\u00021\u0001\u0002T\u0005ya/\u00197jI\u0006$XMT8u\u001dVdG.\u0006\u0003\u0002d\u0005ED\u0003BA3\u0003W\u00022aVA4\u0013\r\tI\u0007\u0017\u0002\b\u0005>|G.Z1o\u0011\u001d\tiG\u0004a\u0001\u0003_\nq!\\3tg\u0006<W\rE\u0002f\u0003c\"a!a\u001d\u000f\u0005\u0004A'!\u0001+\u0002\u0017]\u0014\u0018\u000e^3TiJ,\u0017-\\\u000b\u0005\u0003s\ny\t\u0006\u0006\u0002|\u0005]\u0015QTAT\u0003c#b!! \u0002\n\u0006M\u0005\u0003BA@\u0003\u000bk!!!!\u000b\u0007\u0005\rE&A\u0005tiJ,\u0017-\\5oO&!\u0011qQAA\u00059\u0019FO]3b[&tw-U;fefDa\u0001]\bA\u0004\u0005-\u0005\u0003B\u0016s\u0003\u001b\u00032!ZAH\t\u0019\t\tj\u0004b\u0001Q\n\u0019q*\u001e;\t\rU|\u00019AAK!\u00159\u0018qCAG\u0011\u001d\tIj\u0004a\u0001\u00037\u000baa\u001d;sK\u0006l\u0007\u0003B\u0016c\u0003\u001bCq!a(\u0010\u0001\u0004\t\t+A\u0004pkR\u0004vN\u001d;\u0011\u000b\t\n\u0019+!$\n\u0007\u0005\u00156EA\u0006D_\u0012,7mT;uY\u0016$\bbBAU\u001f\u0001\u0007\u00111V\u0001\u000b_V$\b/\u001e;N_\u0012,\u0007\u0003BA@\u0003[KA!a,\u0002\u0002\nQq*\u001e;qkRlu\u000eZ3\t\u0013\u0005Mv\u0002%AA\u0002\u0005U\u0016aD8qi&|g.\u00197Ue&<w-\u001a:\u0011\u000b]\u000b9,a/\n\u0007\u0005e\u0006L\u0001\u0004PaRLwN\u001c\t\u0005\u0003\u007f\ni,\u0003\u0003\u0002@\u0006\u0005%a\u0002+sS\u001e<WM]\u0001\u0016oJLG/Z*ue\u0016\fW\u000e\n3fM\u0006,H\u000e\u001e\u00135+\u0011\t)-a7\u0016\u0005\u0005\u001d'\u0006BA[\u0003\u0013\\#!a3\u0011\t\u00055\u0017q[\u0007\u0003\u0003\u001fTA!!5\u0002T\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003+D\u0016AC1o]>$\u0018\r^5p]&!\u0011\u0011\\Ah\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0003\u0007\u0003#\u0003\"\u0019\u00015\u0002\u001b\rDWmY6q_&tG\u000fR5s)\u0011\t\t%!9\t\u000f\u0005\r\u0018\u00031\u0001\u0002B\u00059A-\u001b:OC6,\u0007")
/* loaded from: input_file:cloudflow/spark/kafka/SparkStreamletContextImpl.class */
public class SparkStreamletContextImpl extends SparkStreamletContext {
    private final Config config;
    private final String storageDir;
    private final long maxOffsetsPerTrigger;

    @Override // cloudflow.spark.SparkStreamletContext
    public StreamletDefinition streamletDefinition() {
        return super.streamletDefinition();
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public Config config() {
        return this.config;
    }

    public String storageDir() {
        return this.storageDir;
    }

    public long maxOffsetsPerTrigger() {
        return this.maxOffsetsPerTrigger;
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public <In> Dataset<In> readStream(CodecInlet<In> codecInlet, Encoder<In> encoder, TypeTags.TypeTag<In> typeTag) {
        Topic findTopicForPort = findTopicForPort(codecInlet);
        return super.session().readStream().format("kafka").option("kafka.bootstrap.servers", runtimeBootstrapServers(findTopicForPort)).options(kafkaConsumerMap(findTopicForPort)).option("maxOffsetsPerTrigger", maxOffsetsPerTrigger()).option("subscribe", findTopicForPort.name()).option("failOnDataLoss", false).option("startingOffsets", "earliest").load().select(Predef$.MODULE$.wrapRefArray(new Column[]{SQLImplicits$.MODULE$.StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value"}))).$(Nil$.MODULE$)})).as(SQLImplicits$.MODULE$.newByteArrayEncoder()).map(bArr -> {
            Object value;
            Success decode = codecInlet.codec().decode(bArr);
            if (decode instanceof Success) {
                value = decode.value();
            } else {
                if (!(decode instanceof Failure)) {
                    throw new MatchError(decode);
                }
                Some some = (Option) codecInlet.errorHandler().apply(bArr, ((Failure) decode).exception());
                value = some instanceof Some ? some.value() : null;
            }
            return value;
        }, encoder).filter(obj -> {
            return BoxesRunTime.boxToBoolean(this.validateNotNull(obj));
        });
    }

    private Map<String, String> kafkaConsumerMap(Topic topic) {
        return (Map) topic.kafkaConsumerProperties().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(6).append("kafka.").append(str).toString()), (String) tuple2._2());
        }, Map$.MODULE$.canBuildFrom());
    }

    private Map<String, String> kafkaProducerMap(Topic topic) {
        return (Map) topic.kafkaProducerProperties().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(6).append("kafka.").append(str).toString()), (String) tuple2._2());
        }, Map$.MODULE$.canBuildFrom());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> boolean validateNotNull(T t) {
        return t != null;
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public <Out> StreamingQuery writeStream(Dataset<Out> dataset, CodecOutlet<Out> codecOutlet, OutputMode outputMode, Option<Trigger> option, Encoder<Out> encoder, TypeTags.TypeTag<Out> typeTag) {
        Function1 function1 = obj -> {
            return new EncodedKV(RoundRobinPartitioner$.MODULE$.equals(codecOutlet.partitioner()) ? null : ((String) codecOutlet.partitioner().apply(obj)).getBytes(), codecOutlet.codec().encode(obj));
        };
        SQLImplicits$ sQLImplicits$ = SQLImplicits$.MODULE$;
        TypeTags universe = package$.MODULE$.universe();
        final SparkStreamletContextImpl sparkStreamletContextImpl = null;
        Dataset map = dataset.map(function1, sQLImplicits$.newProductEncoder(universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(SparkStreamletContextImpl.class.getClassLoader()), new TypeCreator(sparkStreamletContextImpl) { // from class: cloudflow.spark.kafka.SparkStreamletContextImpl$$typecreator4$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                mirror.universe();
                return mirror.staticClass("cloudflow.spark.kafka.EncodedKV").asType().toTypeConstructor();
            }
        })));
        Topic findTopicForPort = findTopicForPort(codecOutlet);
        String name = findTopicForPort.name();
        String runtimeBootstrapServers = runtimeBootstrapServers(findTopicForPort);
        DataStreamWriter option2 = map.writeStream().outputMode(outputMode).format("kafka").queryName(new StringBuilder(1).append(streamletRef()).append(".").append(codecOutlet).toString()).option("kafka.bootstrap.servers", runtimeBootstrapServers).options(kafkaProducerMap(findTopicForPort)).option("topic", name).option("checkpointLocation", checkpointDir(codecOutlet.name()));
        return (StreamingQuery) option.map(trigger -> {
            return option2.trigger(trigger).start();
        }).getOrElse(() -> {
            return option2.start();
        });
    }

    public <Out> Option<Trigger> writeStream$default$4() {
        return None$.MODULE$;
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public String checkpointDir(String str) {
        File file = new File(new File(storageDir(), streamletRef()), str);
        if (!file.exists()) {
            Predef$.MODULE$.require(file.mkdirs(), () -> {
                return new StringBuilder(39).append("Could not create checkpoint directory: ").append(file).toString();
            });
        }
        return file.getAbsolutePath();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SparkStreamletContextImpl(StreamletDefinition streamletDefinition, SparkSession sparkSession, Config config) {
        super(streamletDefinition, sparkSession);
        this.config = config;
        this.storageDir = config.getString("storage.mountPath");
        this.maxOffsetsPerTrigger = config.getLong("cloudflow.spark.read.options.max-offsets-per-trigger");
    }
}
