/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaTopicSettings;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import java.io.Serializable;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\ra\u0001B\u0004\t\u0001eA\u0001B\f\u0001\u0003\u0002\u0003\u0006Ia\f\u0005\tq\u0001\u0011\t\u0011)A\u0005s!AA\t\u0001B\u0001B\u0003%Q\tC\u0003Q\u0001\u0011\u0005\u0011\u000bC\u0003X\u0001\u0011\u0005\u0003\fC\u0003n\u0001\u0011%aN\u0001\tLC\u001a\\\u0017MQ1uG\"<&/\u001b;fe*\u0011\u0011BC\u0001\u0006W\u000647.\u0019\u0006\u0003\u00171\tq\u0001\u001d7vO&t7O\u0003\u0002\u000e\u001d\u0005)1\u000f]1sW*\u0011q\u0002E\u0001\nG>t7/^7feNT!!\u0005\n\u0002\t]\f7\u000f\u001d\u0006\u0003'Q\tqAY5hI\u0006$\u0018M\u0003\u0002\u0016-\u0005A\u0011mZ5mK2\f'MC\u0001\u0018\u0003\tIGo\u0001\u0001\u0014\t\u0001Q\u0002E\n\t\u00037yi\u0011\u0001\b\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004\b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0005\rb\u0011aB<sSR,'o]\u0005\u0003K\t\u0012\u0001c\u00159be.\u0014\u0015\r^2i/JLG/\u001a:\u0011\u0005\u001dbS\"\u0001\u0015\u000b\u0005%R\u0013a\u00027pO\u001eLgn\u001a\u0006\u0003WA\tAaY8sK&\u0011Q\u0006\u000b\u0002\b\u0019><w-\u001b8h\u0003\u001d!x\u000e]5d\u00052\u0003\"\u0001\r\u001c\u000e\u0003ER!AM\u001a\u0002\u0005\td'BA\u00165\u0015\t)\u0004#\u0001\u0006sKB|7/\u001b;pefL!aN\u0019\u0003\u000fQ{\u0007/[2C\u0019\u00069Bo\u001c9jG\u0012\u000bG/Y:u_J,Wj\u001c3fY:\u000bW.\u001a\t\u0003u\u0005s!aO \u0011\u0005qbR\"A\u001f\u000b\u0005yB\u0012A\u0002\u001fs_>$h(\u0003\u0002A9\u00051\u0001K]3eK\u001aL!AQ\"\u0003\rM#(/\u001b8h\u0015\t\u0001E$\u0001\u0002tgB\u0011aIT\u0007\u0002\u000f*\u0011\u0001*S\u0001\u0004gFd'BA\u0007K\u0015\tYE*\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001b\u0006\u0019qN]4\n\u0005=;%\u0001D*qCJ\\7+Z:tS>t\u0017A\u0002\u001fj]&$h\b\u0006\u0003S)V3\u0006CA*\u0001\u001b\u0005A\u0001\"\u0002\u0018\u0005\u0001\u0004y\u0003\"\u0002\u001d\u0005\u0001\u0004I\u0004\"\u0002#\u0005\u0001\u0004)\u0015!B<sSR,GCA-]!\tY\",\u0003\u0002\\9\t!QK\\5u\u0011\u0015iV\u00011\u0001_\u0003\t!g\r\u0005\u0002`U:\u0011\u0001\r\u001b\b\u0003C\u001et!A\u00194\u000f\u0005\r,gB\u0001\u001fe\u0013\u0005i\u0015BA&M\u0013\ti!*\u0003\u0002I\u0013&\u0011\u0011nR\u0001\ba\u0006\u001c7.Y4f\u0013\tYGNA\u0005ECR\fgI]1nK*\u0011\u0011nR\u0001\rC\u0012$7*\u00194lC\u000e{gN\u001a\u000b\u0004_V<\bc\u0001$qe&\u0011\u0011o\u0012\u0002\u0010\t\u0006$\u0018M\u0012:b[\u0016<&/\u001b;feB\u0011ai]\u0005\u0003i\u001e\u00131AU8x\u0011\u00151h\u00011\u0001p\u0003\r!7o\u001e\u0005\u0006q\u001a\u0001\r!_\u0001\u0004i.\u001c\u0007C\u0001>\u0000\u001b\u0005Y(B\u0001?~\u00035\u0019wN\u001c4jOV\u0014\u0018\r^5p]*\u0011a\u0010E\u0001\u0007[>$W\r\\:\n\u0007\u0005\u00051PA\bUS:L8*\u00194lC\u000e{gNZ5h\u0001")
public class KafkaBatchWriter
implements SparkBatchWriter,
Logging {
    private final TopicBL topicBL;
    private final String topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public void write(Dataset<Row> df) {
        KafkaTopicSettings settings = TopicModelUtils$.MODULE$.retrieveKafkaTopicSettings(this.topicBL, this.topicDatastoreModelName);
        Dataset<Row> finalDf = KafkaWriters$.MODULE$.convertDataframe(df, settings.topicFieldName(), settings.topics(), settings.mainTopicModel(), settings.darwinConf());
        DataFrameWriter partialDfWriter = finalDf.write().format("kafka");
        DataFrameWriter partialDfWriterAfterTopicConf = settings.isMultiTopic() ? partialDfWriter : partialDfWriter.option("topic", ((TopicModel)settings.topicsToWrite().head()).name());
        DataFrameWriter<Row> dataframeWriterAfterKafkaConfig = this.addKafkaConf((DataFrameWriter<Row>)partialDfWriterAfterTopicConf, settings.tinyKafkaConfig());
        String compressionForKafka = ((TopicModel)settings.topicsToWrite().head()).topicCompression().kafkaProp();
        DataFrameWriter finalDataframeWriterAfterCompression = dataframeWriterAfterKafkaConfig.option("kafka.compression.type", compressionForKafka);
        finalDataframeWriterAfterCompression.save();
    }

    private DataFrameWriter<Row> addKafkaConf(DataFrameWriter<Row> dsw, TinyKafkaConfig tkc) {
        String connectionString = ((TraversableOnce)tkc.connections().map((Function1 & Serializable & scala.Serializable)conn -> new StringBuilder(1).append(conn.host()).append(":").append(conn.port()).toString(), Seq$.MODULE$.canBuildFrom())).mkString(",");
        Seq kafkaConfigMap = tkc.others();
        return dsw.option("kafka.bootstrap.servers", connectionString).option("kafka.partitioner.class", tkc.partitioner_fqcn()).option("kafka.batch.size", ((Object)BoxesRunTime.boxToInteger((int)tkc.batch_send_size())).toString()).option("kafka.acks", tkc.acks()).options((Map)((TraversableOnce)kafkaConfigMap.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toTupla(), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaBatchWriter(TopicBL topicBL, String topicDatastoreModelName, SparkSession ss) {
        this.topicBL = topicBL;
        this.topicDatastoreModelName = topicDatastoreModelName;
        Logging.$init$((Logging)this);
    }
}

