package it.agilelab.bigdata.wasp.consumers.spark.plugins.mongo;

import com.mongodb.spark.sql.MongoForeachRddWriter$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.models.DocumentModel;
import it.agilelab.bigdata.wasp.models.WriterModel;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.MatchError;
import scala.reflect.ScalaSignature;

/* compiled from: MongoSparkStructuredStreamingWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001q3A!\u0002\u0004\u0001/!AA\u0005\u0001B\u0001B\u0003%Q\u0005\u0003\u0005,\u0001\t\u0005\t\u0015!\u0003-\u0011\u0015y\u0003\u0001\"\u00011\u0011\u0015)\u0004\u0001\"\u00117\u0005\rjuN\\4p'B\f'o[*ueV\u001cG/\u001e:fIN#(/Z1nS:<wK]5uKJT!a\u0002\u0005\u0002\u000b5|gnZ8\u000b\u0005%Q\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u00171\tQa\u001d9be.T!!\u0004\b\u0002\u0013\r|gn];nKJ\u001c(BA\b\u0011\u0003\u00119\u0018m\u001d9\u000b\u0005E\u0011\u0012a\u00022jO\u0012\fG/\u0019\u0006\u0003'Q\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002+\u0005\u0011\u0011\u000e^\u0002\u0001'\r\u0001\u0001D\b\t\u00033qi\u0011A\u0007\u0006\u00027\u0005)1oY1mC&\u0011QD\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u0005}\u0011S\"\u0001\u0011\u000b\u0005\u0005R\u0011aB<sSR,'o]\u0005\u0003G\u0001\u0012ad\u00159be.\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/JLG/\u001a:\u0002\r]\u0014\u0018\u000e^3s!\t1\u0013&D\u0001(\u0015\tAc\"\u0001\u0004n_\u0012,Gn]\u0005\u0003U\u001d\u00121b\u0016:ji\u0016\u0014Xj\u001c3fY\u0006)Qn\u001c3fYB\u0011a%L\u0005\u0003]\u001d\u0012Q\u0002R8dk6,g\u000e^'pI\u0016d\u0017A\u0002\u001fj]&$h\bF\u00022gQ\u0002\"A\r\u0001\u000e\u0003\u0019AQ\u0001J\u0002A\u0002\u0015BQaK\u0002A\u00021\nQa\u001e:ji\u0016$\"a\u000e%\u0011\u0007a\u0012E)D\u0001:\u0015\tQ4(A\u0005tiJ,\u0017-\\5oO*\u0011A(P\u0001\u0004gFd'BA\u0006?\u0015\ty\u0004)\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0003\u0006\u0019qN]4\n\u0005\rK$\u0001\u0005#bi\u0006\u001cFO]3b[^\u0013\u0018\u000e^3s!\t)e)D\u0001<\u0013\t95HA\u0002S_^DQ!\u0013\u0003A\u0002)\u000baa\u001d;sK\u0006l\u0007CA&Z\u001d\tauK\u0004\u0002N-:\u0011a*\u0016\b\u0003\u001fRs!\u0001U*\u000e\u0003ES!A\u0015\f\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0015BA A\u0013\tYa(\u0003\u0002={%\u0011\u0001lO\u0001\ba\u0006\u001c7.Y4f\u0013\tQ6LA\u0005ECR\fgI]1nK*\u0011\u0001l\u000f")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/mongo/MongoSparkStructuredStreamingWriter.class */
public class MongoSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter {
    private final WriterModel writer;
    private final DocumentModel model;

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        SparkConf clone = dataset.sparkSession().sparkContext().getConf().clone();
        this.writer.options().foreach(tuple2 -> {
            if (tuple2 != null) {
                return clone.set((String) tuple2._1(), (String) tuple2._2());
            }
            throw new MatchError(tuple2);
        });
        clone.set("spark.mongodb.output.uri", this.model.connectionString());
        return dataset.writeStream().foreach(MongoForeachRddWriter$.MODULE$.apply(clone, dataset.schema()));
    }

    public MongoSparkStructuredStreamingWriter(WriterModel writerModel, DocumentModel documentModel) {
        this.writer = writerModel;
        this.model = documentModel;
    }
}
