package com.linkedin.feathr.offline.transformation;

import com.linkedin.feathr.common.AnchorExtractor;
import com.linkedin.feathr.common.FeatureTypeConfig;
import com.linkedin.feathr.common.FeatureTypes;
import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.common.SparkRowExtractor;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrException;
import com.linkedin.feathr.offline.FeatureDataFrame;
import com.linkedin.feathr.offline.anchored.anchorExtractor.SimpleConfigurableAnchorExtractor;
import com.linkedin.feathr.offline.job.FeatureTransformation$;
import com.linkedin.feathr.offline.job.FeatureTypeInferenceContext;
import com.linkedin.feathr.offline.job.TransformedResult;
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext;
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Enumeration;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

/* compiled from: DataFrameBasedRowEvaluator.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/transformation/DataFrameBasedRowEvaluator$.class */
public final class DataFrameBasedRowEvaluator$ {
    public static DataFrameBasedRowEvaluator$ MODULE$;

    static {
        new DataFrameBasedRowEvaluator$();
    }

    public TransformedResult transform(AnchorExtractor<?> anchorExtractor, Dataset<Row> dataset, Seq<Tuple2<String, String>> seq, Map<String, FeatureTypeConfig> map, Option<FeathrExpressionExecutionContext> option) {
        if (!(anchorExtractor instanceof SparkRowExtractor)) {
            throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(31).append(anchorExtractor).append(" must extend SparkRowExtractor.").toString());
        }
        Seq<String> seq2 = (Seq) seq.map(tuple2 -> {
            return (String) tuple2._1();
        }, Seq$.MODULE$.canBuildFrom());
        String str = (String) ((Tuple2) seq.head())._2();
        Enumeration.Value FDS_TENSOR = FeatureColumnFormat$.MODULE$.FDS_TENSOR();
        Seq<String> providedFeatureNames = seq2.nonEmpty() ? seq2 : anchorExtractor.getProvidedFeatureNames();
        FeatureDataFrame transformToFDSTensor = transformToFDSTensor(anchorExtractor, dataset, providedFeatureNames, map, option);
        if (transformToFDSTensor == null) {
            throw new MatchError(transformToFDSTensor);
        }
        Tuple2 tuple22 = new Tuple2(transformToFDSTensor.df(), transformToFDSTensor.inferredFeatureType());
        return new TransformedResult((Seq) providedFeatureNames.map(str2 -> {
            return new Tuple2(str2, str);
        }, Seq$.MODULE$.canBuildFrom()), (Dataset) tuple22._1(), ((TraversableOnce) providedFeatureNames.map(str3 -> {
            return new Tuple2(str3, FDS_TENSOR);
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()), (Map) tuple22._2());
    }

    private FeatureDataFrame transformToFDSTensor(SparkRowExtractor sparkRowExtractor, Dataset<Row> dataset, Seq<String> seq, Map<String, FeatureTypeConfig> map, Option<FeathrExpressionExecutionContext> option) {
        StructType schema = dataset.schema();
        SparkSession orCreate = SparkSession$.MODULE$.builder().getOrCreate();
        FeatureTypeInferenceContext typeInferenceContext = FeatureTransformation$.MODULE$.getTypeInferenceContext(orCreate, map.mapValues(featureTypeConfig -> {
            return featureTypeConfig.getFeatureType();
        }), seq);
        if (typeInferenceContext == null) {
            throw new MatchError(typeInferenceContext);
        }
        Map<String, FeatureTypeAccumulator> featureTypeAccumulators = typeInferenceContext.featureTypeAccumulators();
        RDD<Row> map2 = dataset.rdd().map(row -> {
            GenericRowWithSchema genericRowWithSchema = row instanceof GenericRowWithSchema ? (GenericRowWithSchema) row : new GenericRowWithSchema((Object[]) row.toSeq().toArray(ClassTag$.MODULE$.Any()), schema);
            if (sparkRowExtractor instanceof SimpleConfigurableAnchorExtractor) {
                ((SimpleConfigurableAnchorExtractor) sparkRowExtractor).mvelContext_$eq(option);
            }
            Map<String, FeatureValue> featuresFromRow = sparkRowExtractor.getFeaturesFromRow(genericRowWithSchema);
            return Row$.MODULE$.merge(Predef$.MODULE$.wrapRefArray(new Row[]{row, Row$.MODULE$.fromSeq((Seq) seq.map(str -> {
                if (!featuresFromRow.contains(str)) {
                    return null;
                }
                FeatureValue featureValue = (FeatureValue) featuresFromRow.apply(str);
                if (((FeatureTypeAccumulator) featureTypeAccumulators.apply(str)).isZero() && featureValue != null) {
                    ((FeatureTypeAccumulator) featureTypeAccumulators.apply(str)).add(FeatureTypes.valueOf(featureValue.getFeatureType().getBasicType().toString()));
                }
                return FeaturizedDatasetUtils$.MODULE$.tensorToFDSDataFrameRow(featureValue.getAsTensorData(), FeaturizedDatasetUtils$.MODULE$.tensorToFDSDataFrameRow$default$2());
            }, Seq$.MODULE$.canBuildFrom()))}));
        }, ClassTag$.MODULE$.apply(Row.class));
        Map map3 = (Map) FeatureTransformation$.MODULE$.inferFeatureTypes(featureTypeAccumulators, map2, seq).map(tuple2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple2._1()), new FeatureTypeConfig((FeatureTypes) tuple2._2()));
        }, Map$.MODULE$.canBuildFrom());
        Tuple2 partition = map.partition(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$transformToFDSTensor$5(tuple22));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple23 = new Tuple2((Map) partition._1(), (Map) partition._2());
        Map map4 = (Map) tuple23._1();
        Map<String, FeatureTypeConfig> $plus$plus = map4.$plus$plus(map3).$plus$plus((Map) tuple23._2());
        return new FeatureDataFrame(createFDSFeatureDF(dataset, seq, orCreate, map2, $plus$plus), $plus$plus);
    }

    private Dataset<Row> createFDSFeatureDF(Dataset<Row> dataset, Seq<String> seq, SparkSession sparkSession, RDD<Row> rdd, Map<String, FeatureTypeConfig> map) {
        String str = "_feathr_mvel_feature_prefix_";
        return (Dataset) ((TraversableOnce) seq.zip(seq, Seq$.MODULE$.canBuildFrom())).foldLeft(sparkSession.createDataFrame(rdd, StructType$.MODULE$.apply((Seq) dataset.schema().union(StructType$.MODULE$.apply(FeatureTransformation$.MODULE$.getFDSSchemaFields(seq, map, "_feathr_mvel_feature_prefix_")), Seq$.MODULE$.canBuildFrom()))).drop(seq), (dataset2, tuple2) -> {
            return dataset2.withColumnRenamed(new StringBuilder(0).append(str).append(tuple2._1()).toString(), (String) tuple2._2());
        });
    }

    public static final /* synthetic */ boolean $anonfun$transformToFDSTensor$5(Tuple2 tuple2) {
        FeatureTypes featureType = ((FeatureTypeConfig) tuple2._2()).getFeatureType();
        FeatureTypes featureTypes = FeatureTypes.UNSPECIFIED;
        return featureType != null ? featureType.equals(featureTypes) : featureTypes == null;
    }

    private DataFrameBasedRowEvaluator$() {
        MODULE$ = this;
    }
}
