/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaTopicSettings;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.KafkaEntryConfig;
import it.agilelab.bigdata.wasp.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005a\u0001B\u0001\u0003\u0001M\u0011\u0001cS1gW\u0006\u0014\u0015\r^2i/JLG/\u001a:\u000b\u0005\r!\u0011!B6bM.\f'BA\u0003\u0007\u0003\u001d\u0001H.^4j]NT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011!C2p]N,X.\u001a:t\u0015\tYA\"\u0001\u0003xCN\u0004(BA\u0007\u000f\u0003\u001d\u0011\u0017n\u001a3bi\u0006T!a\u0004\t\u0002\u0011\u0005<\u0017\u000e\\3mC\nT\u0011!E\u0001\u0003SR\u001c\u0001a\u0005\u0003\u0001)i\u0001\u0003CA\u000b\u0019\u001b\u00051\"\"A\f\u0002\u000bM\u001c\u0017\r\\1\n\u0005e1\"AB!osJ+g\r\u0005\u0002\u001c=5\tAD\u0003\u0002\u001e\r\u00059qO]5uKJ\u001c\u0018BA\u0010\u001d\u0005A\u0019\u0006/\u0019:l\u0005\u0006$8\r[,sSR,'\u000f\u0005\u0002\"M5\t!E\u0003\u0002$I\u00059An\\4hS:<'BA\u0013\u000b\u0003\u0011\u0019wN]3\n\u0005\u001d\u0012#a\u0002'pO\u001eLgn\u001a\u0005\tS\u0001\u0011\t\u0011)A\u0005U\u00059Ao\u001c9jG\nc\u0005CA\u00162\u001b\u0005a#BA\u0017/\u0003\t\u0011GN\u0003\u0002&_)\u0011\u0001GC\u0001\u000be\u0016\u0004xn]5u_JL\u0018B\u0001\u001a-\u0005\u001d!v\u000e]5d\u00052C\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!N\u0001\u0018i>\u0004\u0018n\u0019#bi\u0006\u001cHo\u001c:f\u001b>$W\r\u001c(b[\u0016\u0004\"AN\u001d\u000f\u0005U9\u0014B\u0001\u001d\u0017\u0003\u0019\u0001&/\u001a3fM&\u0011!h\u000f\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005a2\u0002\u0002C\u001f\u0001\u0005\u0003\u0005\u000b\u0011\u0002 \u0002\u0005M\u001c\bCA H\u001b\u0005\u0001%BA!C\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000f\rS!\u0001R#\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00051\u0015aA8sO&\u0011\u0001\n\u0011\u0002\r'B\f'o[*fgNLwN\u001c\u0005\u0006\u0015\u0002!\taS\u0001\u0007y%t\u0017\u000e\u001e \u0015\t1su\n\u0015\t\u0003\u001b\u0002i\u0011A\u0001\u0005\u0006S%\u0003\rA\u000b\u0005\u0006i%\u0003\r!\u000e\u0005\u0006{%\u0003\rA\u0010\u0005\u0006%\u0002!\teU\u0001\u0006oJLG/\u001a\u000b\u0003)^\u0003\"!F+\n\u0005Y3\"\u0001B+oSRDQ\u0001W)A\u0002e\u000b!\u0001\u001a4\u0011\u0005iCgBA.g\u001d\taVM\u0004\u0002^I:\u0011al\u0019\b\u0003?\nl\u0011\u0001\u0019\u0006\u0003CJ\ta\u0001\u0010:p_Rt\u0014\"\u0001$\n\u0005\u0011+\u0015BA\u0004D\u0013\t\t%)\u0003\u0002h\u0001\u00069\u0001/Y2lC\u001e,\u0017BA5k\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0002h\u0001\")A\u000e\u0001C\u0005[\u0006a\u0011\r\u001a3LC\u001a\\\u0017mQ8oMR\u0019a\u000e\u001e<\u0011\u0007}z\u0017/\u0003\u0002q\u0001\nyA)\u0019;b\rJ\fW.Z,sSR,'\u000f\u0005\u0002@e&\u00111\u000f\u0011\u0002\u0004%><\b\"B;l\u0001\u0004q\u0017a\u00013to\")qo\u001ba\u0001q\u0006\u0019Ao[2\u0011\u0005etX\"\u0001>\u000b\u0005md\u0018!D2p]\u001aLw-\u001e:bi&|gN\u0003\u0002~\u0015\u00051Qn\u001c3fYNL!a >\u0003\u001fQKg._&bM.\f7i\u001c8gS\u001e\u0004")
public class KafkaBatchWriter
implements SparkBatchWriter,
Logging {
    private final TopicBL topicBL;
    private final String topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public void write(Dataset<Row> df) {
        KafkaTopicSettings settings = TopicModelUtils$.MODULE$.retrieveKafkaTopicSettings(this.topicBL, this.topicDatastoreModelName);
        Dataset<Row> finalDf = KafkaWriters$.MODULE$.convertDataframe(df, settings.topicFieldName(), settings.topics(), settings.mainTopicModel(), settings.darwinConf());
        DataFrameWriter partialDfWriter = finalDf.write().format("kafka");
        DataFrameWriter partialDfWriterAfterTopicConf = settings.isMultiTopic() ? partialDfWriter : partialDfWriter.option("topic", ((TopicModel)settings.topicsToWrite().head()).name());
        DataFrameWriter<Row> dataframeWriterAfterKafkaConfig = this.addKafkaConf((DataFrameWriter<Row>)partialDfWriterAfterTopicConf, settings.tinyKafkaConfig());
        String compressionForKafka = ((TopicModel)settings.topicsToWrite().head()).topicCompression().kafkaProp();
        DataFrameWriter finalDataframeWriterAfterCompression = dataframeWriterAfterKafkaConfig.option("kafka.compression.type", compressionForKafka);
        finalDataframeWriterAfterCompression.save();
    }

    private DataFrameWriter<Row> addKafkaConf(DataFrameWriter<Row> dsw, TinyKafkaConfig tkc) {
        String connectionString = ((TraversableOnce)tkc.connections().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(ConnectionConfig conn) {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{conn.host(), BoxesRunTime.boxToInteger((int)conn.port())}));
            }
        }, Seq$.MODULE$.canBuildFrom())).mkString(",");
        Seq kafkaConfigMap = tkc.others();
        return dsw.option("kafka.bootstrap.servers", connectionString).option("kafka.partitioner.class", tkc.partitioner_fqcn()).option("kafka.batch.size", ((Object)BoxesRunTime.boxToInteger((int)tkc.batch_send_size())).toString()).option("kafka.acks", tkc.acks()).options((Map)((TraversableOnce)kafkaConfigMap.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, String> apply(KafkaEntryConfig x$1) {
                return x$1.toTupla();
            }
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaBatchWriter(TopicBL topicBL, String topicDatastoreModelName, SparkSession ss) {
        this.topicBL = topicBL;
        this.topicDatastoreModelName = topicDatastoreModelName;
        Logging.class.$init$((Logging)this);
    }
}

