package ai.chronon.spark.streaming;

import ai.chronon.online.Fetcher;
import ai.chronon.online.Metrics$Name$;
import java.util.Iterator;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Row;
import scala.Array$;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.concurrent.Await$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.util.ScalaJavaConversions$;

/* compiled from: JoinSourceRunner.scala */
/* loaded from: input_file:ai/chronon/spark/streaming/JoinSourceRunner$$anon$1.class */
public final class JoinSourceRunner$$anon$1 implements MapPartitionsFunction<Row, Row> {
    private final /* synthetic */ JoinSourceRunner $outer;
    public final String joinRequestName$1;
    public final Tuple2[] joinChrononSchema$1;
    public final String[] joinFields$1;
    public final String[] leftColumns$1;
    public final int leftTimeIndex$1;

    public Iterator<Row> call(Iterator<Row> it) {
        boolean z = Math.random() <= 0.1d;
        Fetcher orSetFetcher = LocalIOCache$.MODULE$.getOrSetFetcher(new JoinSourceRunner$$anon$1$$anonfun$17(this));
        Row[] rowArr = (Row[]) ScalaJavaConversions$.MODULE$.IteratorOps(it).toScala().toArray(ClassTag$.MODULE$.apply(Row.class));
        Fetcher.Request[] requestArr = (Fetcher.Request[]) Predef$.MODULE$.refArrayOps(rowArr).map(new JoinSourceRunner$$anon$1$$anonfun$18(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Fetcher.Request.class)));
        Option<Object> percentile = this.$outer.percentile((long[]) Predef$.MODULE$.refArrayOps(rowArr).map(new JoinSourceRunner$$anon$1$$anonfun$19(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Long())), this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile());
        if (percentile.isDefined()) {
            long currentTimeMillis = System.currentTimeMillis() - BoxesRunTime.unboxToLong(percentile.get());
            this.$outer.context().distribution(Metrics$Name$.MODULE$.BatchLagMillis(), currentTimeMillis);
            if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs() > 0 && currentTimeMillis >= 0 && currentTimeMillis < this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs()) {
                long ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs = this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs() - currentTimeMillis;
                Thread.sleep(ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs);
                this.$outer.context().distribution(Metrics$Name$.MODULE$.QueryDelaySleepMillis(), ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs);
            }
        }
        if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug && z) {
            ObjectRef create = ObjectRef.create(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\\nShowing all ", " requests:\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(requestArr.length)})));
            Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(requestArr).zipWithIndex(Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).foreach(new JoinSourceRunner$$anon$1$$anonfun$call$2(this, create));
            this.$outer.logger().info((String) create.elem);
        }
        Seq seq = (Seq) Await$.MODULE$.result(orSetFetcher.fetchJoin(Predef$.MODULE$.refArrayOps(requestArr).toSeq(), orSetFetcher.fetchJoin$default$2()), new package.DurationInt(package$.MODULE$.DurationInt(5)).second());
        if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug && z) {
            this.$outer.logger().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Request count: ", " Response count: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(requestArr.length), BoxesRunTime.boxToInteger(seq.length())})));
            ObjectRef create2 = ObjectRef.create(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\\n Showing all ", " responses:\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(seq.length())})));
            ((IterableLike) seq.zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(new JoinSourceRunner$$anon$1$$anonfun$call$3(this, create2));
            this.$outer.logger().info((String) create2.elem);
        }
        return ScalaJavaConversions$.MODULE$.JIteratorOps(seq.iterator().map(new JoinSourceRunner$$anon$1$$anonfun$call$4(this))).toJava();
    }

    public /* synthetic */ JoinSourceRunner ai$chronon$spark$streaming$JoinSourceRunner$$anon$$$outer() {
        return this.$outer;
    }

    public JoinSourceRunner$$anon$1(JoinSourceRunner joinSourceRunner, String str, Tuple2[] tuple2Arr, String[] strArr, String[] strArr2, int i) {
        if (joinSourceRunner == null) {
            throw null;
        }
        this.$outer = joinSourceRunner;
        this.joinRequestName$1 = str;
        this.joinChrononSchema$1 = tuple2Arr;
        this.joinFields$1 = strArr;
        this.leftColumns$1 = strArr2;
        this.leftTimeIndex$1 = i;
    }
}
