package com.linkedin.feathr.offline.evaluator.lookup;

import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.compute.AnyNode;
import com.linkedin.feathr.compute.Lookup;
import com.linkedin.feathr.offline.PostTransformationUtil$;
import com.linkedin.feathr.offline.derived.strategies.SeqJoinAggregator$;
import com.linkedin.feathr.offline.derived.strategies.SequentialJoinAsDerivation$;
import com.linkedin.feathr.offline.evaluator.NodeEvaluator;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata$;
import com.linkedin.feathr.offline.graph.FCMGraphTraverser;
import com.linkedin.feathr.offline.graph.NodeUtils$;
import com.linkedin.feathr.offline.join.algorithms.JoinType$;
import com.linkedin.feathr.offline.join.algorithms.SequentialJoinConditionBuilder$;
import com.linkedin.feathr.offline.join.algorithms.SparkJoinWithJoinCondition;
import com.linkedin.feathr.offline.join.algorithms.SparkJoinWithJoinCondition$;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.util.DataFrameSplitterMerger$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

/* compiled from: LookupNodeEvaluator.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/evaluator/lookup/LookupNodeEvaluator$.class */
public final class LookupNodeEvaluator$ implements NodeEvaluator {
    public static LookupNodeEvaluator$ MODULE$;

    static {
        new LookupNodeEvaluator$();
    }

    public DataframeAndColumnMetadata processLookupNode(Lookup lookup, DataframeAndColumnMetadata dataframeAndColumnMetadata, Seq<String> seq, DataframeAndColumnMetadata dataframeAndColumnMetadata2, Dataset<Row> dataset, String str, SparkJoinWithJoinCondition sparkJoinWithJoinCondition, Map<String, FeatureValue> map, SparkSession sparkSession) {
        String str2 = (String) dataframeAndColumnMetadata2.featureColumn().get();
        Dataset select = dataframeAndColumnMetadata2.df().select((Seq) ((Seq) dataframeAndColumnMetadata2.keyExpression().$plus$plus(new $colon.colon((String) dataframeAndColumnMetadata2.featureColumn().get(), Nil$.MODULE$), Seq$.MODULE$.canBuildFrom())).map(str3 -> {
            return functions$.MODULE$.col(str3);
        }, Seq$.MODULE$.canBuildFrom()));
        Dataset<Row> df = select.toDF(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(select.columns())).map(str4 -> {
            return new StringBuilder(13).append("__expansion__").append(str4).toString();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSeq());
        Tuple2<Dataset<Row>, Dataset<Row>> splitOnNull = DataFrameSplitterMerger$.MODULE$.splitOnNull(PostTransformationUtil$.MODULE$.transformFeatures((Seq) new $colon.colon(new Tuple2(dataframeAndColumnMetadata.featureColumn().get(), dataframeAndColumnMetadata.featureColumn().get()), Nil$.MODULE$), dataset, Predef$.MODULE$.Map().empty(), (dataType, str5) -> {
            return SequentialJoinAsDerivation$.MODULE$.getDefaultTransformation(dataType, str5);
        }, None$.MODULE$), (String) dataframeAndColumnMetadata.featureColumn().get());
        if (splitOnNull == null) {
            throw new MatchError(splitOnNull);
        }
        Tuple2 tuple2 = new Tuple2((Dataset) splitOnNull._1(), (Dataset) splitOnNull._2());
        Dataset dataset2 = (Dataset) tuple2._1();
        Dataset dataset3 = (Dataset) tuple2._2();
        Tuple2<Seq<String>, Dataset<Row>> explodeLeftJoinKey = SeqJoinAggregator$.MODULE$.explodeLeftJoinKey(sparkSession, dataset2.withColumn("__frame_seq_join_group_by_id", functions$.MODULE$.monotonically_increasing_id()), seq, str);
        if (explodeLeftJoinKey == null) {
            throw new MatchError(explodeLeftJoinKey);
        }
        Tuple2 tuple22 = new Tuple2((Seq) explodeLeftJoinKey._1(), (Dataset) explodeLeftJoinKey._2());
        Dataset<Row> join2 = sparkJoinWithJoinCondition.join2((Seq<String>) tuple22._1(), (Dataset<Row>) tuple22._2(), (Seq<String>) dataframeAndColumnMetadata2.keyExpression().map(str6 -> {
            return new StringBuilder(13).append("__expansion__").append(str6).toString();
        }, Seq$.MODULE$.canBuildFrom()), df, JoinType$.MODULE$.left_outer());
        String sb = new StringBuilder(13).append("__expansion__").append(str2).toString();
        Option<FeatureValue> option = map.get(str2);
        Dataset<Row> substituteDefaultValuesForSeqJoinFeature = SeqJoinAggregator$.MODULE$.substituteDefaultValuesForSeqJoinFeature(join2, sb, option, sparkSession);
        String aggregation = lookup.getAggregation();
        return new DataframeAndColumnMetadata(DataFrameSplitterMerger$.MODULE$.merge(SeqJoinAggregator$.MODULE$.applyAggregationFunction(str, sb, substituteDefaultValuesForSeqJoinFeature, aggregation, "__frame_seq_join_group_by_id"), SeqJoinAggregator$.MODULE$.applyAggregationFunction(str, sb, SeqJoinAggregator$.MODULE$.substituteDefaultValuesForSeqJoinFeature(dataset3.withColumn(sb, functions$.MODULE$.lit((Object) null).cast(join2.schema().apply(sb).dataType())), sb, option, sparkSession).withColumn("__frame_seq_join_group_by_id", functions$.MODULE$.monotonically_increasing_id()), aggregation, "__frame_seq_join_group_by_id")).drop((Seq) dataframeAndColumnMetadata2.keyExpression().map(str7 -> {
            return new StringBuilder(13).append("__expansion__").append(str7).toString();
        }, Seq$.MODULE$.canBuildFrom())).drop(new StringBuilder(8).append("__base__").append(dataframeAndColumnMetadata.featureColumn().get()).toString()).withColumnRenamed(sb, str), (Seq) dataframeAndColumnMetadata.keyExpression().map(str8 -> {
            return (String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(str8.split("__"))).last();
        }, Seq$.MODULE$.canBuildFrom()), new Some(str), DataframeAndColumnMetadata$.MODULE$.apply$default$4(), DataframeAndColumnMetadata$.MODULE$.apply$default$5());
    }

    private Seq<Integer> getLookupNodeKeys(AnyNode anyNode) {
        Seq<Integer> seq;
        if (anyNode.isLookup()) {
            seq = (Seq) JavaConverters$.MODULE$.asScalaBufferConverter(anyNode.getLookup().getConcreteKey().getKey()).asScala();
        } else if (anyNode.isDataSource()) {
            seq = anyNode.getDataSource().hasConcreteKey() ? (Seq) JavaConverters$.MODULE$.asScalaBufferConverter(anyNode.getDataSource().getConcreteKey().getKey()).asScala() : null;
        } else {
            if (!anyNode.isTransformation()) {
                throw new MatchError(anyNode);
            }
            seq = (Seq) JavaConverters$.MODULE$.asScalaBufferConverter(anyNode.getTransformation().getConcreteKey().getKey()).asScala();
        }
        return seq;
    }

    @Override // com.linkedin.feathr.offline.evaluator.NodeEvaluator
    public Dataset<Row> evaluate(AnyNode anyNode, FCMGraphTraverser fCMGraphTraverser, Dataset<Row> dataset, List<DataPathHandler> list) {
        Lookup lookup = anyNode.getLookup();
        DataframeAndColumnMetadata dataframeAndColumnMetadata = (DataframeAndColumnMetadata) fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().apply(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(((Lookup.LookupKey) ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(lookup.getLookupKey()).asScala()).find(lookupKey -> {
            return BoxesRunTime.boxToBoolean(lookupKey.isNodeReference());
        }).get()).getNodeReference().getId())));
        Seq<String> seq = (Seq) getLookupNodeKeys((AnyNode) fCMGraphTraverser.nodes().apply(Predef$.MODULE$.Integer2int(lookup.getLookupNode()))).flatMap(num -> {
            return ((DataframeAndColumnMetadata) fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().apply(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(num)))).featureColumn().isDefined() ? new $colon.colon((String) ((DataframeAndColumnMetadata) fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().apply(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(num)))).featureColumn().get(), Nil$.MODULE$) : ((DataframeAndColumnMetadata) fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().apply(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(num)))).keyExpression();
        }, Seq$.MODULE$.canBuildFrom());
        Integer lookupNode = lookup.getLookupNode();
        DataframeAndColumnMetadata processLookupNode = processLookupNode(lookup, dataframeAndColumnMetadata, seq, (DataframeAndColumnMetadata) fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().apply(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(lookupNode))), dataset, (String) fCMGraphTraverser.nodeIdToFeatureName().apply(lookup.getId()), SparkJoinWithJoinCondition$.MODULE$.apply(SequentialJoinConditionBuilder$.MODULE$), NodeUtils$.MODULE$.getDefaultConverter((Seq) new $colon.colon((AnyNode) fCMGraphTraverser.nodes().apply(Predef$.MODULE$.Integer2int(lookupNode)), Nil$.MODULE$)), fCMGraphTraverser.ss());
        fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().update(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(lookup.getId())), processLookupNode);
        return processLookupNode.df();
    }

    @Override // com.linkedin.feathr.offline.evaluator.NodeEvaluator
    public Dataset<Row> batchEvaluate(Seq<AnyNode> seq, FCMGraphTraverser fCMGraphTraverser, Dataset<Row> dataset, List<DataPathHandler> list) {
        return (Dataset) seq.foldLeft(dataset, (dataset2, anyNode) -> {
            return MODULE$.evaluate(anyNode, fCMGraphTraverser, dataset2, list);
        });
    }

    private LookupNodeEvaluator$() {
        MODULE$ = this;
    }
}
