/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaTopicSettings;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.KafkaEntryConfig;
import it.agilelab.bigdata.wasp.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005a\u0001B\u0001\u0003\u0001M\u00111eS1gW\u0006\u001c\u0006/\u0019:l'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,sSR,'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\ba2,x-\u001b8t\u0015\t9\u0001\"A\u0003ta\u0006\u00148N\u0003\u0002\n\u0015\u0005I1m\u001c8tk6,'o\u001d\u0006\u0003\u00171\tAa^1ta*\u0011QBD\u0001\bE&<G-\u0019;b\u0015\ty\u0001#\u0001\u0005bO&dW\r\\1c\u0015\u0005\t\u0012AA5u\u0007\u0001\u0019B\u0001\u0001\u000b\u001bAA\u0011Q\u0003G\u0007\u0002-)\tq#A\u0003tG\u0006d\u0017-\u0003\u0002\u001a-\t1\u0011I\\=SK\u001a\u0004\"a\u0007\u0010\u000e\u0003qQ!!\b\u0004\u0002\u000f]\u0014\u0018\u000e^3sg&\u0011q\u0004\b\u0002\u001f'B\f'o[*ueV\u001cG/\u001e:fIN#(/Z1nS:<wK]5uKJ\u0004\"!\t\u0014\u000e\u0003\tR!a\t\u0013\u0002\u000f1|wmZ5oO*\u0011QEC\u0001\u0005G>\u0014X-\u0003\u0002(E\t9Aj\\4hS:<\u0007\u0002C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002\u000fQ|\u0007/[2C\u0019B\u00111&M\u0007\u0002Y)\u0011QFL\u0001\u0003E2T!!J\u0018\u000b\u0005AR\u0011A\u0003:fa>\u001c\u0018\u000e^8ss&\u0011!\u0007\f\u0002\b)>\u0004\u0018n\u0019\"M\u0011!!\u0004A!A!\u0002\u0013)\u0014a\u0006;pa&\u001cG)\u0019;bgR|'/Z'pI\u0016dg*Y7f!\t1\u0014H\u0004\u0002\u0016o%\u0011\u0001HF\u0001\u0007!J,G-\u001a4\n\u0005iZ$AB*ue&twM\u0003\u00029-!AQ\b\u0001B\u0001B\u0003%a(\u0001\u0002tgB\u0011qhR\u0007\u0002\u0001*\u0011\u0011IQ\u0001\u0004gFd'BA\u0004D\u0015\t!U)\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\r\u0006\u0019qN]4\n\u0005!\u0003%\u0001D*qCJ\\7+Z:tS>t\u0007\"\u0002&\u0001\t\u0003Y\u0015A\u0002\u001fj]&$h\b\u0006\u0003M\u001d>\u0003\u0006CA'\u0001\u001b\u0005\u0011\u0001\"B\u0015J\u0001\u0004Q\u0003\"\u0002\u001bJ\u0001\u0004)\u0004\"B\u001fJ\u0001\u0004q\u0004\"\u0002*\u0001\t\u0003\u001a\u0016!B<sSR,GC\u0001+^!\r)\u0006LW\u0007\u0002-*\u0011q\u000bQ\u0001\ngR\u0014X-Y7j]\u001eL!!\u0017,\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\bCA \\\u0013\ta\u0006IA\u0002S_^DQAX)A\u0002}\u000b!\u0001\u001a4\u0011\u0005\u0001tgBA1m\u001d\t\u00117N\u0004\u0002dU:\u0011A-\u001b\b\u0003K\"l\u0011A\u001a\u0006\u0003OJ\ta\u0001\u0010:p_Rt\u0014\"\u0001$\n\u0005\u0011+\u0015BA\u0004D\u0013\t\t%)\u0003\u0002n\u0001\u00069\u0001/Y2lC\u001e,\u0017BA8q\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0002n\u0001\")!\u000f\u0001C\u0005g\u0006a\u0011\r\u001a3LC\u001a\\\u0017mQ8oMR\u0019A\u000b\u001e<\t\u000bU\f\b\u0019\u0001+\u0002\u0007\u0011\u001cx\u000fC\u0003xc\u0002\u0007\u00010A\u0002uW\u000e\u0004\"!\u001f@\u000e\u0003iT!a\u001f?\u0002\u001b\r|gNZ5hkJ\fG/[8o\u0015\ti(\"\u0001\u0004n_\u0012,Gn]\u0005\u0003\u007fj\u0014q\u0002V5os.\u000bgm[1D_:4\u0017n\u001a")
public class KafkaSparkStructuredStreamingWriter
implements SparkStructuredStreamingWriter,
Logging {
    private final TopicBL topicBL;
    private final String topicDatastoreModelName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public DataStreamWriter<Row> write(Dataset<Row> df) {
        KafkaTopicSettings settings = TopicModelUtils$.MODULE$.retrieveKafkaTopicSettings(this.topicBL, this.topicDatastoreModelName);
        Dataset<Row> finalDf = KafkaWriters$.MODULE$.convertDataframe(df, settings.topicFieldName(), settings.topics(), settings.mainTopicModel(), settings.darwinConf());
        DataStreamWriter partialDataStreamWriter = finalDf.writeStream().format("kafka");
        DataStreamWriter partialDataStreamWriterAfterTopicConf = settings.isMultiTopic() ? partialDataStreamWriter : partialDataStreamWriter.option("topic", ((TopicModel)settings.topicsToWrite().head()).name());
        DataStreamWriter<Row> dataStreamWriterAfterKafkaConfig = this.addKafkaConf((DataStreamWriter<Row>)partialDataStreamWriterAfterTopicConf, settings.tinyKafkaConfig());
        String compressionForKafka = ((TopicModel)settings.topicsToWrite().head()).topicCompression().kafkaProp();
        DataStreamWriter finalDataStreamWriterAfterCompression = dataStreamWriterAfterKafkaConfig.option("kafka.compression.type", compressionForKafka);
        return finalDataStreamWriterAfterCompression;
    }

    private DataStreamWriter<Row> addKafkaConf(DataStreamWriter<Row> dsw, TinyKafkaConfig tkc) {
        String connectionString = ((TraversableOnce)tkc.connections().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(ConnectionConfig conn) {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{conn.host(), BoxesRunTime.boxToInteger((int)conn.port())}));
            }
        }, Seq$.MODULE$.canBuildFrom())).mkString(",");
        Seq kafkaConfigMap = tkc.others();
        return dsw.option("kafka.bootstrap.servers", connectionString).option("kafka.partitioner.class", tkc.partitioner_fqcn()).option("kafka.batch.size", ((Object)BoxesRunTime.boxToInteger((int)tkc.batch_send_size())).toString()).option("kafka.acks", tkc.acks()).options((Map)((TraversableOnce)kafkaConfigMap.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, String> apply(KafkaEntryConfig x$1) {
                return x$1.toTupla();
            }
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public KafkaSparkStructuredStreamingWriter(TopicBL topicBL, String topicDatastoreModelName, SparkSession ss) {
        this.topicBL = topicBL;
        this.topicDatastoreModelName = topicDatastoreModelName;
        Logging.class.$init$((Logging)this);
    }
}

