/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaTopicSettings;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.Function1;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\ra\u0001B\u0004\t\u0001eA\u0001B\f\u0001\u0003\u0002\u0003\u0006Ia\f\u0005\tq\u0001\u0011\t\u0011)A\u0005s!AA\t\u0001B\u0001B\u0003%Q\tC\u0003Q\u0001\u0011\u0005\u0011\u000bC\u0003X\u0001\u0011\u0005\u0003\fC\u0003t\u0001\u0011%AOA\u0012LC\u001a\\\u0017m\u00159be.\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/JLG/\u001a:\u000b\u0005%Q\u0011!B6bM.\f'BA\u0006\r\u0003\u001d\u0001H.^4j]NT!!\u0004\b\u0002\u000bM\u0004\u0018M]6\u000b\u0005=\u0001\u0012!C2p]N,X.\u001a:t\u0015\t\t\"#\u0001\u0003xCN\u0004(BA\n\u0015\u0003\u001d\u0011\u0017n\u001a3bi\u0006T!!\u0006\f\u0002\u0011\u0005<\u0017\u000e\\3mC\nT\u0011aF\u0001\u0003SR\u001c\u0001a\u0005\u0003\u00015\u00012\u0003CA\u000e\u001f\u001b\u0005a\"\"A\u000f\u0002\u000bM\u001c\u0017\r\\1\n\u0005}a\"AB!osJ+g\r\u0005\u0002\"I5\t!E\u0003\u0002$\u0019\u00059qO]5uKJ\u001c\u0018BA\u0013#\u0005y\u0019\u0006/\u0019:l'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,sSR,'\u000f\u0005\u0002(Y5\t\u0001F\u0003\u0002*U\u00059An\\4hS:<'BA\u0016\u0011\u0003\u0011\u0019wN]3\n\u00055B#a\u0002'pO\u001eLgnZ\u0001\bi>\u0004\u0018n\u0019\"M!\t\u0001d'D\u00012\u0015\t\u00114'\u0001\u0002cY*\u00111\u0006\u000e\u0006\u0003kA\t!B]3q_NLGo\u001c:z\u0013\t9\u0014GA\u0004U_BL7M\u0011'\u0002/Q|\u0007/[2ECR\f7\u000f^8sK6{G-\u001a7OC6,\u0007C\u0001\u001eB\u001d\tYt\b\u0005\u0002=95\tQH\u0003\u0002?1\u00051AH]8pizJ!\u0001\u0011\u000f\u0002\rA\u0013X\rZ3g\u0013\t\u00115I\u0001\u0004TiJLgn\u001a\u0006\u0003\u0001r\t!a]:\u0011\u0005\u0019sU\"A$\u000b\u0005!K\u0015aA:rY*\u0011QB\u0013\u0006\u0003\u00172\u000ba!\u00199bG\",'\"A'\u0002\u0007=\u0014x-\u0003\u0002P\u000f\na1\u000b]1sWN+7o]5p]\u00061A(\u001b8jiz\"BA\u0015+V-B\u00111\u000bA\u0007\u0002\u0011!)a\u0006\u0002a\u0001_!)\u0001\b\u0002a\u0001s!)A\t\u0002a\u0001\u000b\u0006)qO]5uKR\u0011\u0011L\u0019\t\u00045v{V\"A.\u000b\u0005q;\u0015!C:ue\u0016\fW.\u001b8h\u0013\tq6L\u0001\tECR\f7\u000b\u001e:fC6<&/\u001b;feB\u0011a\tY\u0005\u0003C\u001e\u00131AU8x\u0011\u0015\u0019W\u00011\u0001e\u0003\t!g\r\u0005\u0002fa:\u0011aM\u001c\b\u0003O6t!\u0001\u001b7\u000f\u0005%\\gB\u0001\u001fk\u0013\u0005i\u0015BA&M\u0013\ti!*\u0003\u0002I\u0013&\u0011qnR\u0001\ba\u0006\u001c7.Y4f\u0013\t\t(OA\u0005ECR\fgI]1nK*\u0011qnR\u0001\rC\u0012$7*\u00194lC\u000e{gN\u001a\u000b\u00043V<\b\"\u0002<\u0007\u0001\u0004I\u0016a\u00013to\")\u0001P\u0002a\u0001s\u0006\u0019Ao[2\u0011\u0005i|X\"A>\u000b\u0005ql\u0018!D2p]\u001aLw-\u001e:bi&|gN\u0003\u0002\u007f!\u00051Qn\u001c3fYNL1!!\u0001|\u0005=!\u0016N\\=LC\u001a\\\u0017mQ8oM&<\u0007")
public class KafkaSparkStructuredStreamingWriter
implements SparkStructuredStreamingWriter,
Logging {
    private final TopicBL topicBL;
    private final String topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public DataStreamWriter<Row> write(Dataset<Row> df) {
        KafkaTopicSettings settings = TopicModelUtils$.MODULE$.retrieveKafkaTopicSettings(this.topicBL, this.topicDatastoreModelName);
        Dataset<Row> finalDf = KafkaWriters$.MODULE$.convertDataframe(df, settings.topicFieldName(), settings.topics(), settings.mainTopicModel(), settings.darwinConf());
        DataStreamWriter partialDataStreamWriter = finalDf.writeStream().format("kafka");
        DataStreamWriter partialDataStreamWriterAfterTopicConf = settings.isMultiTopic() ? partialDataStreamWriter : partialDataStreamWriter.option("topic", ((TopicModel)settings.topicsToWrite().head()).name());
        DataStreamWriter<Row> dataStreamWriterAfterKafkaConfig = this.addKafkaConf((DataStreamWriter<Row>)partialDataStreamWriterAfterTopicConf, settings.tinyKafkaConfig());
        String compressionForKafka = ((TopicModel)settings.topicsToWrite().head()).topicCompression().kafkaProp();
        DataStreamWriter finalDataStreamWriterAfterCompression = dataStreamWriterAfterKafkaConfig.option("kafka.compression.type", compressionForKafka);
        return finalDataStreamWriterAfterCompression;
    }

    private DataStreamWriter<Row> addKafkaConf(DataStreamWriter<Row> dsw, TinyKafkaConfig tkc) {
        String connectionString = ((TraversableOnce)tkc.connections().map((Function1 & Serializable & scala.Serializable)conn -> new StringBuilder(1).append(conn.host()).append(":").append(conn.port()).toString(), Seq$.MODULE$.canBuildFrom())).mkString(",");
        Seq kafkaConfigMap = tkc.others();
        return dsw.option("kafka.bootstrap.servers", connectionString).option("kafka.partitioner.class", tkc.partitioner_fqcn()).option("kafka.batch.size", ((Object)BoxesRunTime.boxToInteger((int)tkc.batch_send_size())).toString()).option("kafka.acks", tkc.acks()).options((Map)((TraversableOnce)kafkaConfigMap.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toTupla(), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaSparkStructuredStreamingWriter(TopicBL topicBL, String topicDatastoreModelName, SparkSession ss) {
        this.topicBL = topicBL;
        this.topicDatastoreModelName = topicDatastoreModelName;
        Logging.$init$((Logging)this);
    }
}

