package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015a\u0001B\u0001\u0003\u0001M\u00111&\u00127bgRL7m]3be\u000eD7\u000b]1sWN#(/^2ukJ,Gm\u0015;sK\u0006l\u0017N\\4Xe&$XM\u001d\u0006\u0003\u0007\u0011\tq!\u001a7bgRL7M\u0003\u0002\u0006\r\u00059\u0001\u000f\\;hS:\u001c(BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"A\u0005d_:\u001cX/\\3sg*\u00111\u0002D\u0001\u0005o\u0006\u001c\bO\u0003\u0002\u000e\u001d\u00059!-[4eCR\f'BA\b\u0011\u0003!\tw-\u001b7fY\u0006\u0014'\"A\t\u0002\u0005%$8\u0001A\n\u0006\u0001QQ\u0002\u0005\u000b\t\u0003+ai\u0011A\u0006\u0006\u0002/\u0005)1oY1mC&\u0011\u0011D\u0006\u0002\u0007\u0003:L(+\u001a4\u0011\u0005mqR\"\u0001\u000f\u000b\u0005u1\u0011aB<sSR,'o]\u0005\u0003?q\u0011ad\u00159be.\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/JLG/\u001a:\u0011\u0005\u00052S\"\u0001\u0012\u000b\u0005\r\"\u0013!B;uS2\u001c(BA\u0013\u000b\u0003\u0011\u0019wN]3\n\u0005\u001d\u0012#\u0001F#mCN$\u0018nY\"p]\u001aLw-\u001e:bi&|g\u000e\u0005\u0002*Y5\t!F\u0003\u0002,I\u00059An\\4hS:<\u0017BA\u0017+\u0005\u001daunZ4j]\u001eD\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001M\u0001\bS:$W\r\u001f\"M!\t\tt'D\u00013\u0015\t\u0019D'\u0001\u0002cY*\u0011Q%\u000e\u0006\u0003m)\t!B]3q_NLGo\u001c:z\u0013\tA$GA\u0004J]\u0012,\u0007P\u0011'\t\u0011i\u0002!\u0011!Q\u0001\nm\n!a]:\u0011\u0005q\"U\"A\u001f\u000b\u0005yz\u0014aA:rY*\u0011q\u0001\u0011\u0006\u0003\u0003\n\u000ba!\u00199bG\",'\"A\"\u0002\u0007=\u0014x-\u0003\u0002F{\ta1\u000b]1sWN+7o]5p]\"Aq\t\u0001B\u0001B\u0003%\u0001*\u0001\u0003oC6,\u0007CA%M\u001d\t)\"*\u0003\u0002L-\u00051\u0001K]3eK\u001aL!!\u0014(\u0003\rM#(/\u001b8h\u0015\tYe\u0003\u0003\u0005Q\u0001\t\u0005\t\u0015!\u0003R\u0003E)G.Y:uS\u000e\fE-\\5o\u0003\u000e$xN\u001d\t\u0003%^k\u0011a\u0015\u0006\u0003)V\u000bQ!Y2u_JT\u0011AV\u0001\u0005C.\\\u0017-\u0003\u0002Y'\nA\u0011i\u0019;peJ+g\rC\u0003[\u0001\u0011\u00051,\u0001\u0004=S:LGO\u0010\u000b\u00069z{\u0006-\u0019\t\u0003;\u0002i\u0011A\u0001\u0005\u0006_e\u0003\r\u0001\r\u0005\u0006ue\u0003\ra\u000f\u0005\u0006\u000ff\u0003\r\u0001\u0013\u0005\u0006!f\u0003\r!\u0015\u0005\u0006G\u0002!\t\u0005Z\u0001\u0006oJLG/\u001a\u000b\u0003K:\u00042AZ5l\u001b\u00059'B\u00015>\u0003%\u0019HO]3b[&tw-\u0003\u0002kO\n\u0001B)\u0019;b'R\u0014X-Y7Xe&$XM\u001d\t\u0003y1L!!\\\u001f\u0003\u0007I{w\u000fC\u0003pE\u0002\u0007\u0001/\u0001\u0004tiJ,\u0017-\u001c\t\u0003c~t!A]?\u000f\u0005MdhB\u0001;|\u001d\t)(P\u0004\u0002ws6\tqO\u0003\u0002y%\u00051AH]8pizJ\u0011aQ\u0005\u0003\u0003\nK!a\u0002!\n\u0005yz\u0014B\u0001@>\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u0001\u0002\u0004\tIA)\u0019;b\rJ\fW.\u001a\u0006\u0003}v\u0002")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkStructuredStreamingWriter.class */
public class ElasticsearchSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final String name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private final ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.class.elasticConfig(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.elasticConfig;
        }
    }

    public ElasticConfigModel elasticConfig() {
        return this.bitmap$0 ? this.elasticConfig : elasticConfig$lzycompute();
    }

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        Option byName = this.indexBL.getByName(this.name);
        if (!byName.isDefined()) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index '", "' does not exits pay ATTENTION spark won't start"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name}));
            logger().error(new ElasticsearchSparkStructuredStreamingWriter$$anonfun$write$3(this, s));
            throw new Exception(s);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        String resource = indexModel.resource();
        logger().info(new ElasticsearchSparkStructuredStreamingWriter$$anonfun$write$1(this, indexModel, eventuallyTimedName));
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"There no define schema in the index configuration: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index name must be all lowercase: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        Map $plus = Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(new ElasticsearchSparkStructuredStreamingWriter$$anonfun$1(this))).toMap(Predef$.MODULE$.$conforms()).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("path"), resource));
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            return dataset.writeStream().options($plus).format("es");
        }
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating elastic index: ", " with this index name ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel, eventuallyTimedName}));
        logger().error(new ElasticsearchSparkStructuredStreamingWriter$$anonfun$write$2(this, s2));
        throw new Exception(s2);
    }

    public ElasticsearchSparkStructuredStreamingWriter(IndexBL indexBL, SparkSession sparkSession, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.class.$init$(this);
        Logging.class.$init$(this);
    }
}
