package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import akka.actor.Props$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.util.Timeout;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.WaspConsumersSparkPlugin;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkBatchReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.configuration.ValidationRule;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct$;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.ReaderModel;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.WriterModel;
import it.agilelab.bigdata.wasp.repository.core.bl.ConfigBL$;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.repository.core.db.WaspDB;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: ElasticConsumersSpark.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Mf\u0001\u0002\t\u0012\u0001\tBQ!\u000e\u0001\u0005\u0002YB\u0011\"\u000f\u0001A\u0002\u0003\u0007I\u0011\u0001\u001e\t\u0013\u0011\u0003\u0001\u0019!a\u0001\n\u0003)\u0005\"C&\u0001\u0001\u0004\u0005\t\u0015)\u0003<\u0011%a\u0005\u00011AA\u0002\u0013\u0005Q\nC\u0005W\u0001\u0001\u0007\t\u0019!C\u0001/\"I\u0011\f\u0001a\u0001\u0002\u0003\u0006KA\u0014\u0005\u00065\u0002!\te\u0017\u0005\u0006E\u0002!\te\u0019\u0005\u0006Y\u0002!\t%\u001c\u0005\b\u0003\u000b\u0001A\u0011IA\u0004\u0011\u001d\t\t\u0005\u0001C!\u0003\u0007Bq!a\u0018\u0001\t\u0003\n\t\u0007C\u0004\u0002~\u0001!\t%a \t\u000f\u0005M\u0005\u0001\"\u0003\u0002\u0016\n)R\t\\1ti&\u001c7i\u001c8tk6,'o]*qCJ\\'B\u0001\n\u0014\u0003\u001d)G.Y:uS\u000eT!\u0001F\u000b\u0002\u000fAdWoZ5og*\u0011acF\u0001\u0006gB\f'o\u001b\u0006\u00031e\t\u0011bY8ogVlWM]:\u000b\u0005iY\u0012\u0001B<bgBT!\u0001H\u000f\u0002\u000f\tLw\rZ1uC*\u0011adH\u0001\tC\u001eLG.\u001a7bE*\t\u0001%\u0001\u0002ji\u000e\u00011\u0003\u0002\u0001$S5\u0002\"\u0001J\u0014\u000e\u0003\u0015R\u0011AJ\u0001\u0006g\u000e\fG.Y\u0005\u0003Q\u0015\u0012a!\u00118z%\u00164\u0007C\u0001\u0016,\u001b\u0005\u0019\u0012B\u0001\u0017\u0014\u0005a9\u0016m\u001d9D_:\u001cX/\\3sgN\u0003\u0018M]6QYV<\u0017N\u001c\t\u0003]Mj\u0011a\f\u0006\u0003aE\nq\u0001\\8hO&twM\u0003\u000233\u0005!1m\u001c:f\u0013\t!tFA\u0004M_\u001e<\u0017N\\4\u0002\rqJg.\u001b;?)\u00059\u0004C\u0001\u001d\u0001\u001b\u0005\t\u0012aB5oI\u0016D(\tT\u000b\u0002wA\u0011AHQ\u0007\u0002{)\u0011ahP\u0001\u0003E2T!A\r!\u000b\u0005\u0005K\u0012A\u0003:fa>\u001c\u0018\u000e^8ss&\u00111)\u0010\u0002\b\u0013:$W\r\u001f\"M\u0003-Ig\u000eZ3y\u00052{F%Z9\u0015\u0005\u0019K\u0005C\u0001\u0013H\u0013\tAUE\u0001\u0003V]&$\bb\u0002&\u0004\u0003\u0003\u0005\raO\u0001\u0004q\u0012\n\u0014\u0001C5oI\u0016D(\t\u0014\u0011\u0002%\u0015d\u0017m\u001d;jG\u0006#W.\u001b8BGR|'oX\u000b\u0002\u001dB\u0011q\nV\u0007\u0002!*\u0011\u0011KU\u0001\u0006C\u000e$xN\u001d\u0006\u0002'\u0006!\u0011m[6b\u0013\t)\u0006K\u0001\u0005BGR|'OU3g\u0003Y)G.Y:uS\u000e\fE-\\5o\u0003\u000e$xN]0`I\u0015\fHC\u0001$Y\u0011\u001dQe!!AA\u00029\u000b1#\u001a7bgRL7-\u00113nS:\f5\r^8s?\u0002\n\u0001\u0003Z1uCN$xN]3Qe>$Wo\u0019;\u0016\u0003q\u0003\"!\u00181\u000e\u0003yS!aX\r\u0002\u0015\u0011\fG/Y:u_J,7/\u0003\u0002b=\n\u0001B)\u0019;bgR|'/\u001a)s_\u0012,8\r^\u0001\u000bS:LG/[1mSj,GC\u0001$e\u0011\u0015)\u0017\u00021\u0001g\u0003\u00199\u0018m\u001d9E\u0005B\u0011qM[\u0007\u0002Q*\u0011\u0011nP\u0001\u0003I\nL!a\u001b5\u0003\r]\u000b7\u000f\u001d#C\u0003I9W\r\u001e,bY&$\u0017\r^5p]J+H.Z:\u0016\u00039\u00042a\\<{\u001d\t\u0001XO\u0004\u0002ri6\t!O\u0003\u0002tC\u00051AH]8pizJ\u0011AJ\u0005\u0003m\u0016\nq\u0001]1dW\u0006<W-\u0003\u0002ys\n\u00191+Z9\u000b\u0005Y,\u0003cA>\u0002\u00025\tAP\u0003\u0002~}\u0006i1m\u001c8gS\u001e,(/\u0019;j_:T!a`\u0019\u0002\r5|G-\u001a7t\u0013\r\t\u0019\u0001 \u0002\u000f-\u0006d\u0017\u000eZ1uS>t'+\u001e7f\u0003\u0005:W\r^*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3s)!\tI!a\u0004\u0002*\u0005]\u0002c\u0001\u001d\u0002\f%\u0019\u0011QB\t\u0003W\u0015c\u0017m\u001d;jGN,\u0017M]2i'B\f'o[*ueV\u001cG/\u001e:fIN#(/Z1nS:<wK]5uKJDq!!\u0005\f\u0001\u0004\t\u0019\"\u0001\u0002tgB!\u0011QCA\u0013\u001b\t\t9B\u0003\u0003\u0002\u001a\u0005m\u0011aA:rY*\u0019a#!\b\u000b\t\u0005}\u0011\u0011E\u0001\u0007CB\f7\r[3\u000b\u0005\u0005\r\u0012aA8sO&!\u0011qEA\f\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0011\u001d\tYc\u0003a\u0001\u0003[\t1d\u001d;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e,E\u000bT'pI\u0016d\u0007\u0003BA\u0018\u0003gi!!!\r\u000b\u0005}L\u0012\u0002BA\u001b\u0003c\u00111d\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e,E\u000bT'pI\u0016d\u0007bBA\u001d\u0017\u0001\u0007\u00111H\u0001\foJLG/\u001a:N_\u0012,G\u000e\u0005\u0003\u00020\u0005u\u0012\u0002BA \u0003c\u00111b\u0016:ji\u0016\u0014Xj\u001c3fY\u0006\ts-\u001a;Ta\u0006\u00148n\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e\u0014V-\u00193feRA\u0011QIA)\u0003'\n)\u0006\u0005\u0003\u0002H\u00055SBAA%\u0015\r\tY%F\u0001\be\u0016\fG-\u001a:t\u0013\u0011\ty%!\u0013\u0003=M\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twMU3bI\u0016\u0014\bbBA\t\u0019\u0001\u0007\u00111\u0003\u0005\b\u0003Wa\u0001\u0019AA\u0017\u0011\u001d\t9\u0006\u0004a\u0001\u00033\nAc\u001d;sK\u0006l\u0017N\\4SK\u0006$WM]'pI\u0016d\u0007\u0003BA\u0018\u00037JA!!\u0018\u00022\t!2\u000b\u001e:fC6Lgn\u001a*fC\u0012,'/T8eK2\f1cZ3u'B\f'o\u001b\"bi\u000eDwK]5uKJ$b!a\u0019\u0002p\u0005m\u0004\u0003BA3\u0003Wj!!a\u001a\u000b\u0007\u0005%T#A\u0004xe&$XM]:\n\t\u00055\u0014q\r\u0002\u0011'B\f'o\u001b\"bi\u000eDwK]5uKJDq!!\u001d\u000e\u0001\u0004\t\u0019(\u0001\u0002tGB!\u0011QOA<\u001b\t\tY\"\u0003\u0003\u0002z\u0005m!\u0001D*qCJ\\7i\u001c8uKb$\bbBA\u001d\u001b\u0001\u0007\u00111H\u0001\u0014O\u0016$8\u000b]1sW\n\u000bGo\u00195SK\u0006$WM\u001d\u000b\u0007\u0003\u0003\u000b9)!#\u0011\t\u0005\u001d\u00131Q\u0005\u0005\u0003\u000b\u000bIE\u0001\tTa\u0006\u00148NQ1uG\"\u0014V-\u00193fe\"9\u0011\u0011\u000f\bA\u0002\u0005M\u0004bBAF\u001d\u0001\u0007\u0011QR\u0001\fe\u0016\fG-\u001a:N_\u0012,G\u000e\u0005\u0003\u00020\u0005=\u0015\u0002BAI\u0003c\u00111BU3bI\u0016\u0014Xj\u001c3fY\u0006q1\u000f^1siV\u0004X\t\\1ti&\u001cG\u0003BAL\u0003S#2ARAM\u0011\u001d\tYj\u0004a\u0002\u0003;\u000bq\u0001^5nK>,H\u000f\u0005\u0003\u0002 \u0006\u0015VBAAQ\u0015\r\t\u0019KU\u0001\u0005kRLG.\u0003\u0003\u0002(\u0006\u0005&a\u0002+j[\u0016|W\u000f\u001e\u0005\b\u0003W{\u0001\u0019AAW\u0003U\u0019XM\u001d<jG\u0016\u001cH+[7f_V$X*\u001b7mSN\u00042\u0001JAX\u0013\r\t\t,\n\u0002\u0005\u0019>tw\r")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticConsumersSpark.class */
public class ElasticConsumersSpark implements WaspConsumersSparkPlugin, Logging {
    private IndexBL indexBL;
    private ActorRef elasticAdminActor_;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public IndexBL indexBL() {
        return this.indexBL;
    }

    public void indexBL_$eq(IndexBL indexBL) {
        this.indexBL = indexBL;
    }

    public ActorRef elasticAdminActor_() {
        return this.elasticAdminActor_;
    }

    public void elasticAdminActor__$eq(ActorRef actorRef) {
        this.elasticAdminActor_ = actorRef;
    }

    public DatastoreProduct datastoreProduct() {
        return DatastoreProduct$.MODULE$.ElasticProduct();
    }

    public void initialize(WaspDB waspDB) {
        logger().info(() -> {
            return "Initialize the index BL";
        });
        indexBL_$eq(ConfigBL$.MODULE$.indexBL());
        logger().info(() -> {
            return new StringBuilder(50).append("Initialize the elastic admin actor with this name ").append(ElasticAdminActor$.MODULE$.name()).toString();
        });
        elasticAdminActor__$eq(WaspSystem$.MODULE$.actorSystem().actorOf(Props$.MODULE$.apply(() -> {
            return new ElasticAdminActor();
        }, ClassTag$.MODULE$.apply(ElasticAdminActor.class)), ElasticAdminActor$.MODULE$.name()));
        startupElastic(WaspSystem$.MODULE$.waspConfig().servicesTimeoutMillis(), new Timeout(r0 - 1000, TimeUnit.MILLISECONDS));
    }

    public Seq<ValidationRule> getValidationRules() {
        return Seq$.MODULE$.apply(Nil$.MODULE$);
    }

    /* renamed from: getSparkStructuredStreamingWriter, reason: merged with bridge method [inline-methods] */
    public ElasticsearchSparkStructuredStreamingWriter m6getSparkStructuredStreamingWriter(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, WriterModel writerModel) {
        logger().info(() -> {
            return new StringBuilder(87).append("Initialize the elastic spark structured streaming writer with this writer model name '").append(writerModel.name()).append("'").toString();
        });
        return new ElasticsearchSparkStructuredStreamingWriter(indexBL(), sparkSession, writerModel.datastoreModelName(), elasticAdminActor_());
    }

    public SparkStructuredStreamingReader getSparkStructuredStreamingReader(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel) {
        String sb = new StringBuilder(83).append("The datastore product ").append(datastoreProduct()).append(" is not a valid streaming source! Reader model ").append(streamingReaderModel).append(" is not valid.").toString();
        logger().error(() -> {
            return sb;
        });
        throw new UnsupportedOperationException(sb);
    }

    public SparkBatchWriter getSparkBatchWriter(SparkContext sparkContext, WriterModel writerModel) {
        logger().info(() -> {
            return new StringBuilder(72).append("Initialize the elastic spark batch writer with this writer model name '").append(writerModel.name()).append("'").toString();
        });
        return new ElasticsearchSparkBatchWriter(indexBL(), sparkContext, writerModel.name(), elasticAdminActor_());
    }

    public SparkBatchReader getSparkBatchReader(SparkContext sparkContext, ReaderModel readerModel) {
        Option byName = indexBL().getByName(readerModel.name());
        if (!byName.isDefined()) {
            String sb = new StringBuilder(23).append("Index model not found: ").append(readerModel).toString();
            logger().error(() -> {
                return sb;
            });
            throw new Exception(sb);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(() -> {
            return new StringBuilder(57).append("Check or create the index model: '").append(indexModel.toString()).append(" with this index name: ").append(eventuallyTimedName).toString();
        });
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringBuilder(51).append("There no define schema in the index configuration: ").append(indexModel).toString());
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringBuilder(38).append("The index name must be all lowercase: ").append(indexModel).toString());
        }
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(elasticAdminActor_(), new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            return new ElasticsearchSparkBatchReader(indexModel);
        }
        String sb2 = new StringBuilder(52).append("Error creating elastic index: ").append(indexModel).append(" with this index name ").append(eventuallyTimedName).toString();
        logger().error(() -> {
            return sb2;
        });
        throw new Exception(sb2);
    }

    private void startupElastic(long j, Timeout timeout) {
        logger().info(() -> {
            return "Trying to connect with Elastic...";
        });
        ActorRef ask = package$.MODULE$.ask(elasticAdminActor_());
        Initialization initialization = new Initialization(ConfigManager$.MODULE$.getElasticConfig());
        boolean z = false;
        Some some = null;
        Option value = Await$.MODULE$.ready(AskableActorRef$.MODULE$.$qmark$extension1(ask, initialization, timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, initialization)), Duration$.MODULE$.apply(j, TimeUnit.MILLISECONDS)).value();
        if (value instanceof Some) {
            z = true;
            some = (Some) value;
            Failure failure = (Try) some.value();
            if (failure instanceof Failure) {
                Throwable exception = failure.exception();
                logger().error(() -> {
                    return exception.getMessage();
                });
                throw new Exception(exception);
            }
        }
        if (z && (((Try) some.value()) instanceof Success)) {
            logger().info(() -> {
                return "The system is connected with Elastic";
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(value)) {
                throw new MatchError(value);
            }
            throw new UnknownError("Unknown error during Elastic connection initialization");
        }
    }

    public ElasticConsumersSpark() {
        Logging.$init$(this);
    }
}
