package cloudflow.spark.kafka;

import cloudflow.spark.SparkStreamletContext;
import cloudflow.spark.avro.EncodedKV;
import cloudflow.spark.avro.SparkAvroDecoder;
import cloudflow.spark.avro.SparkAvroEncoder;
import cloudflow.spark.sql.SQLImplicits$;
import cloudflow.streamlets.CodecInlet;
import cloudflow.streamlets.CodecOutlet;
import cloudflow.streamlets.StreamletDefinition;
import cloudflow.streamlets.Topic;
import com.typesafe.config.Config;
import java.io.File;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.TypeTags;

/* compiled from: SparkStreamletContextImpl.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005}e\u0001\u0002\t\u0012\u0001aA\u0011\"\b\u0001\u0003\u0006\u0004%\t%\u0006\u0010\t\u0013\u0015\u0002!\u0011!Q\u0001\n}1\u0003\"C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u00154\u0011!!\u0004A!b\u0001\n\u0003*\u0004\u0002C \u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\t\u000b\u0001\u0003A\u0011A!\t\u000f\u001d\u0003!\u0019!C\u0001\u0011\"1\u0011\u000b\u0001Q\u0001\n%CqA\u0015\u0001C\u0002\u0013\u00051\u000b\u0003\u0004[\u0001\u0001\u0006I\u0001\u0016\u0005\u00067\u0002!\t\u0001\u0018\u0005\b\u0003S\u0001A\u0011AA\u0016\u0011\u001d\t)\u0006\u0001C\u0001\u0003/Bq!a\u0017\u0001\t\u0003\ti\u0006C\u0004\u0002\u0018\u0002!\t!!'\u00033M\u0003\u0018M]6TiJ,\u0017-\u001c7fi\u000e{g\u000e^3yi&k\u0007\u000f\u001c\u0006\u0003%M\tQa[1gW\u0006T!\u0001F\u000b\u0002\u000bM\u0004\u0018M]6\u000b\u0003Y\t\u0011b\u00197pk\u00124Gn\\<\u0004\u0001M\u0011\u0001!\u0007\t\u00035mi\u0011aE\u0005\u00039M\u0011Qc\u00159be.\u001cFO]3b[2,GoQ8oi\u0016DH/A\ntiJ,\u0017-\u001c7fi\u0012+g-\u001b8ji&|g.F\u0001 !\t\u00013%D\u0001\"\u0015\t\u0011S#\u0001\u0006tiJ,\u0017-\u001c7fiNL!\u0001J\u0011\u0003'M#(/Z1nY\u0016$H)\u001a4j]&$\u0018n\u001c8\u0002)M$(/Z1nY\u0016$H)\u001a4j]&$\u0018n\u001c8!\u0013\ti2$A\u0004tKN\u001c\u0018n\u001c8\u0011\u0005%\nT\"\u0001\u0016\u000b\u0005-b\u0013aA:rY*\u0011A#\f\u0006\u0003]=\na!\u00199bG\",'\"\u0001\u0019\u0002\u0007=\u0014x-\u0003\u00023U\ta1\u000b]1sWN+7o]5p]&\u0011qeG\u0001\u0007G>tg-[4\u0016\u0003Y\u0002\"aN\u001f\u000e\u0003aR!\u0001N\u001d\u000b\u0005iZ\u0014\u0001\u0003;za\u0016\u001c\u0018MZ3\u000b\u0003q\n1aY8n\u0013\tq\u0004H\u0001\u0004D_:4\u0017nZ\u0001\bG>tg-[4!\u0003\u0019a\u0014N\\5u}Q!!\tR#G!\t\u0019\u0005!D\u0001\u0012\u0011\u0015ib\u00011\u0001 \u0011\u00159c\u00011\u0001)\u0011\u0015!d\u00011\u00017\u0003)\u0019Ho\u001c:bO\u0016$\u0015N]\u000b\u0002\u0013B\u0011!jT\u0007\u0002\u0017*\u0011A*T\u0001\u0005Y\u0006twMC\u0001O\u0003\u0011Q\u0017M^1\n\u0005A[%AB*ue&tw-A\u0006ti>\u0014\u0018mZ3ESJ\u0004\u0013\u0001F7bq>3gm]3ugB+'\u000f\u0016:jO\u001e,'/F\u0001U!\t)\u0006,D\u0001W\u0015\u00059\u0016!B:dC2\f\u0017BA-W\u0005\u0011auN\\4\u0002+5\f\u0007p\u00144gg\u0016$8\u000fU3s)JLwmZ3sA\u0005Q!/Z1e'R\u0014X-Y7\u0016\u0005u#Gc\u00010\u0002 Q\u0019q,\u001c:\u0011\u0007%\u0002'-\u0003\u0002bU\t9A)\u0019;bg\u0016$\bCA2e\u0019\u0001!Q!Z\u0006C\u0002\u0019\u0014!!\u00138\u0012\u0005\u001dT\u0007CA+i\u0013\tIgKA\u0004O_RD\u0017N\\4\u0011\u0005U[\u0017B\u00017W\u0005\r\te.\u001f\u0005\u0006].\u0001\u001da\\\u0001\bK:\u001cw\u000eZ3s!\rI\u0003OY\u0005\u0003c*\u0012q!\u00128d_\u0012,'\u000fC\u0003t\u0017\u0001\u000fA/A\u0004usB,G+Y4\u0011\tU\f\u0019B\u0019\b\u0004m\u00065abA<\u0002\b9\u0019\u00010!\u0001\u000f\u0005ethB\u0001>~\u001b\u0005Y(B\u0001?\u0018\u0003\u0019a$o\\8u}%\tq+\u0003\u0002��-\u00069!/\u001a4mK\u000e$\u0018\u0002BA\u0002\u0003\u000b\tqA];oi&lWM\u0003\u0002��-&!\u0011\u0011BA\u0006\u0003\u001d\u0001\u0018mY6bO\u0016TA!a\u0001\u0002\u0006%!\u0011qBA\t\u0003!)h.\u001b<feN,'\u0002BA\u0005\u0003\u0017IA!!\u0006\u0002\u0018\t9A+\u001f9f)\u0006<\u0017\u0002BA\r\u00037\u0011\u0001\u0002V=qKR\u000bwm\u001d\u0006\u0005\u0003;\t)!A\u0002ba&Dq!!\t\f\u0001\u0004\t\u0019#\u0001\u0004j]B{'\u000f\u001e\t\u0005A\u0005\u0015\"-C\u0002\u0002(\u0005\u0012!bQ8eK\u000eLe\u000e\\3u\u0003AY\u0017MZ6b\u0007>t7/^7fe6\u000b\u0007\u000f\u0006\u0003\u0002.\u0005-\u0003\u0003CA\u0018\u0003s\ti$!\u0010\u000e\u0005\u0005E\"\u0002BA\u001a\u0003k\t\u0011\"[7nkR\f'\r\\3\u000b\u0007\u0005]b+\u0001\u0006d_2dWm\u0019;j_:LA!a\u000f\u00022\t\u0019Q*\u00199\u0011\t\u0005}\u0012q\t\b\u0005\u0003\u0003\n\u0019\u0005\u0005\u0002{-&\u0019\u0011Q\t,\u0002\rA\u0013X\rZ3g\u0013\r\u0001\u0016\u0011\n\u0006\u0004\u0003\u000b2\u0006bBA'\u0019\u0001\u0007\u0011qJ\u0001\u0006i>\u0004\u0018n\u0019\t\u0004A\u0005E\u0013bAA*C\t)Ak\u001c9jG\u0006\u00012.\u00194lCB\u0013x\u000eZ;dKJl\u0015\r\u001d\u000b\u0005\u0003[\tI\u0006C\u0004\u0002N5\u0001\r!a\u0014\u0002\u0017]\u0014\u0018\u000e^3TiJ,\u0017-\\\u000b\u0005\u0003?\n)\b\u0006\u0005\u0002b\u0005u\u00141QAG)\u0019\t\u0019'a\u001c\u0002zA!\u0011QMA6\u001b\t\t9GC\u0002\u0002j)\n\u0011b\u001d;sK\u0006l\u0017N\\4\n\t\u00055\u0014q\r\u0002\u000f'R\u0014X-Y7j]\u001e\fV/\u001a:z\u0011\u0019qg\u0002q\u0001\u0002rA!\u0011\u0006]A:!\r\u0019\u0017Q\u000f\u0003\u0007\u0003or!\u0019\u00014\u0003\u0007=+H\u000f\u0003\u0004t\u001d\u0001\u000f\u00111\u0010\t\u0006k\u0006M\u00111\u000f\u0005\b\u0003\u007fr\u0001\u0019AAA\u0003\u0019\u0019HO]3b[B!\u0011\u0006YA:\u0011\u001d\t)I\u0004a\u0001\u0003\u000f\u000bqa\\;u!>\u0014H\u000fE\u0003!\u0003\u0013\u000b\u0019(C\u0002\u0002\f\u0006\u00121bQ8eK\u000e|U\u000f\u001e7fi\"9\u0011q\u0012\bA\u0002\u0005E\u0015AC8viB,H/T8eKB!\u0011QMAJ\u0013\u0011\t)*a\u001a\u0003\u0015=+H\u000f];u\u001b>$W-A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d\u000b\u0005\u0003{\tY\nC\u0004\u0002\u001e>\u0001\r!!\u0010\u0002\u000f\u0011L'OT1nK\u0002")
/* loaded from: input_file:cloudflow/spark/kafka/SparkStreamletContextImpl.class */
public class SparkStreamletContextImpl extends SparkStreamletContext {
    private final Config config;
    private final String storageDir;
    private final long maxOffsetsPerTrigger;

    @Override // cloudflow.spark.SparkStreamletContext
    public StreamletDefinition streamletDefinition() {
        return super.streamletDefinition();
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public Config config() {
        return this.config;
    }

    public String storageDir() {
        return this.storageDir;
    }

    public long maxOffsetsPerTrigger() {
        return this.maxOffsetsPerTrigger;
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public <In> Dataset<In> readStream(CodecInlet<In> codecInlet, Encoder<In> encoder, TypeTags.TypeTag<In> typeTag) {
        ExpressionEncoder apply = RowEncoder$.MODULE$.apply(encoder.schema());
        String schemaAsString = codecInlet.schemaAsString();
        Topic findTopicForPort = findTopicForPort(codecInlet);
        return super.session().readStream().format("kafka").option("kafka.bootstrap.servers", (String) findTopicForPort.bootstrapServers().getOrElse(() -> {
            return this.internalKafkaBootstrapServers();
        })).options(kafkaConsumerMap(findTopicForPort)).option("maxOffsetsPerTrigger", maxOffsetsPerTrigger()).option("subscribe", findTopicForPort.name()).option("failOnDataLoss", false).option("startingOffsets", "earliest").load().select(Predef$.MODULE$.wrapRefArray(new Column[]{SQLImplicits$.MODULE$.StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value"}))).$(Nil$.MODULE$)})).as(SQLImplicits$.MODULE$.newByteArrayEncoder()).mapPartitions(iterator -> {
            SparkAvroDecoder sparkAvroDecoder = new SparkAvroDecoder(schemaAsString, encoder, typeTag);
            return iterator.map(bArr -> {
                return sparkAvroDecoder.decode(bArr);
            });
        }, apply).as(encoder);
    }

    public Map<String, String> kafkaConsumerMap(Topic topic) {
        return (Map) topic.kafkaConsumerProperties().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(6).append("kafka.").append(str).toString()), (String) tuple2._2());
        }, Map$.MODULE$.canBuildFrom());
    }

    public Map<String, String> kafkaProducerMap(Topic topic) {
        return (Map) topic.kafkaProducerProperties().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(6).append("kafka.").append(str).toString()), (String) tuple2._2());
        }, Map$.MODULE$.canBuildFrom());
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public <Out> StreamingQuery writeStream(Dataset<Out> dataset, CodecOutlet<Out> codecOutlet, OutputMode outputMode, Encoder<Out> encoder, TypeTags.TypeTag<Out> typeTag) {
        Dataset<EncodedKV> encodeWithKey = new SparkAvroEncoder(codecOutlet.schemaAsString(), encoder, typeTag).encodeWithKey(dataset, codecOutlet.partitioner());
        Topic findTopicForPort = findTopicForPort(codecOutlet);
        String name = findTopicForPort.name();
        String str = (String) findTopicForPort.bootstrapServers().getOrElse(() -> {
            return this.internalKafkaBootstrapServers();
        });
        return encodeWithKey.writeStream().outputMode(outputMode).format("kafka").queryName(new StringBuilder(1).append(streamletRef()).append(".").append(codecOutlet).toString()).option("kafka.bootstrap.servers", str).options(kafkaProducerMap(findTopicForPort)).option("topic", name).option("checkpointLocation", checkpointDir(codecOutlet.name())).start();
    }

    @Override // cloudflow.spark.SparkStreamletContext
    public String checkpointDir(String str) {
        File file = new File(new File(storageDir(), streamletRef()), str);
        if (!file.exists()) {
            Predef$.MODULE$.require(file.mkdirs(), () -> {
                return new StringBuilder(39).append("Could not create checkpoint directory: ").append(file).toString();
            });
        }
        return file.getAbsolutePath();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SparkStreamletContextImpl(StreamletDefinition streamletDefinition, SparkSession sparkSession, Config config) {
        super(streamletDefinition, sparkSession);
        this.config = config;
        this.storageDir = config.getString("storage.mountPath");
        this.maxOffsetsPerTrigger = config.getLong("cloudflow.spark.read.options.max-offsets-per-trigger");
    }
}
