/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl;

import it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Path;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Path$;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetadataOps$;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetricsTelemetryMessage;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetricsTelemetryMessageFormat$;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.TelemetryMetadataProducer$;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.TelemetryMetadataProducerConfig;
import java.io.Serializable;
import java.time.Clock;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import scala.Function1;
import scala.Function5;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import spray.json.JsValue;

public final class MetadataOps$ {
    public static MetadataOps$ MODULE$;

    static {
        new MetadataOps$();
    }

    public Dataset<Row> enter(String etlName, Dataset<Row> stream) {
        return this.on(new StringBuilder(6).append(etlName.replace(' ', '-')).append("-enter").toString(), stream);
    }

    public Dataset<Row> exit(String etlName, Dataset<Row> stream) {
        return this.on(new StringBuilder(5).append(etlName.replace(' ', '-')).append("-exit").toString(), stream);
    }

    private Dataset<Row> on(String path, Dataset<Row> stream) {
        Dataset dataset;
        if (new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])stream.columns())).contains((Object)"metadata")) {
            String[] originalColumnsOrder = stream.columns();
            UserDefinedFunction updateFunction = this.updateMetadata(path);
            dataset = stream.withColumn("metadata_new", updateFunction.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("metadata.id"), functions$.MODULE$.col("metadata.sourceId"), functions$.MODULE$.col("metadata.arrivalTimestamp"), functions$.MODULE$.col("metadata.lastSeenTimestamp"), functions$.MODULE$.col("metadata.path")}))).drop("metadata").withColumnRenamed("metadata_new", "metadata").select((String)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])originalColumnsOrder)).head(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])originalColumnsOrder)).tail()));
        } else {
            dataset = stream;
        }
        return dataset;
    }

    public UserDefinedFunction updateMetadata(String path) {
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        JavaUniverse $u2 = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m2 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        JavaUniverse $u3 = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m3 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        JavaUniverse $u4 = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m4 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator1$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata").asType().toTypeConstructor();
            }

            public It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator1$1() {
            }
        }
        public final class It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator2$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
            }

            public It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator2$1() {
            }
        }
        public final class It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator3$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
            }

            public It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator3$1() {
            }
        }
        public final class It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator4$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().thisPrefix((Symbols.SymbolApi)$m.RootClass()), (Symbols.SymbolApi)$m.staticPackage("scala")), (Symbols.SymbolApi)$m.staticModule("scala.package")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.package").asModule().moduleClass(), "Seq"), (List)new .colon.colon((Object)$m.staticClass("org.apache.spark.sql.Row").asType().toTypeConstructor(), (List)Nil$.MODULE$));
            }

            public It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator4$1() {
            }
        }
        return functions$.MODULE$.udf((Function5 & Serializable & scala.Serializable)(mId, mSourceId, mArrivalTimestamp, x$4, mPath) -> MetadataOps$.$anonfun$updateMetadata$1(path, mId, mSourceId, BoxesRunTime.unboxToLong((Object)mArrivalTimestamp), BoxesRunTime.unboxToLong((Object)x$4), mPath), ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator1$1()), ((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator2$1()), ((TypeTags)$u3).TypeTag().apply((Mirror)$m3, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator3$1()), ((TypeTags)package$.MODULE$.universe()).TypeTag().Long(), ((TypeTags)package$.MODULE$.universe()).TypeTag().Long(), ((TypeTags)$u4).TypeTag().apply((Mirror)$m4, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_streaming_actor_etl_MetadataOps$$typecreator4$1()));
    }

    public Dataset<Row> sendLatencyMessage(Dataset<Row> stream, TelemetryMetadataProducerConfig config, int samplingFactor) {
        Dataset dataset;
        if (new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])stream.columns())).contains((Object)"metadata")) {
            ExpressionEncoder rowEncoder = RowEncoder$.MODULE$.apply(stream.schema());
            dataset = stream.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> {
                IntRef counter = IntRef.create((int)0);
                return partition.map((Function1 & Serializable & scala.Serializable)row -> {
                    Object object;
                    if (counter$1.elem % samplingFactor == 0) {
                        Row metadata = row.getStruct(row.fieldIndex("metadata"));
                        int pathField = metadata.fieldIndex("path");
                        String messageId = metadata.getString(metadata.fieldIndex("id"));
                        String sourceId = metadata.getString(metadata.fieldIndex("sourceId"));
                        long arrivalTimestamp = metadata.getLong(metadata.fieldIndex("arrivalTimestamp"));
                        Path path = new Path(sourceId, arrivalTimestamp);
                        Seq path2 = (Seq)((SeqLike)metadata.getSeq(pathField).map((Function1 & Serializable & scala.Serializable)r -> Path$.MODULE$.apply((Row)r), Seq$.MODULE$.canBuildFrom())).$plus$colon((Object)path, Seq$.MODULE$.canBuildFrom());
                        Seq lastTwoHops = (Seq)path2.takeRight(2);
                        long latency = ((Path)lastTwoHops.apply(1)).ts() - ((Path)lastTwoHops.apply(0)).ts();
                        String collectionTimeAsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(((Path)lastTwoHops.apply(1)).ts()));
                        String compositeSourceId = ((TraversableOnce)path2.map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.name().replace(' ', '-'), Seq$.MODULE$.canBuildFrom())).mkString("/");
                        MetricsTelemetryMessage message = new MetricsTelemetryMessage(messageId, compositeSourceId, "latencyMs", latency, collectionTimeAsString);
                        JsValue json = MetricsTelemetryMessageFormat$.MODULE$.metricsTelemetryMessageFormat().write((Object)message);
                        object = TelemetryMetadataProducer$.MODULE$.send(config, messageId, json);
                    } else {
                        object = BoxedUnit.UNIT;
                    }
                    ++counter$1.elem;
                    return row;
                });
            }, (Encoder)rowEncoder);
        } else {
            dataset = stream;
        }
        return dataset;
    }

    public static final /* synthetic */ Metadata $anonfun$updateMetadata$1(String path$1, String mId, String mSourceId, long mArrivalTimestamp, long x$4, Seq mPath) {
        Instant nowInstant = Clock.systemUTC().instant();
        long now = nowInstant.toEpochMilli();
        Seq oldPaths = (Seq)mPath.map((Function1 & Serializable & scala.Serializable)r -> Path$.MODULE$.apply((Row)r), Seq$.MODULE$.canBuildFrom());
        Path[] newPaths = (Path[])((TraversableOnce)oldPaths.$colon$plus((Object)new Path(path$1, now), Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Path.class));
        return new Metadata(mId, mSourceId, mArrivalTimestamp, now, newPaths);
    }

    private MetadataOps$() {
        MODULE$ = this;
    }
}

