package com.linkedin.feathr.swj;

import com.linkedin.feathr.offline.transformation.DataFrameExt$;
import com.linkedin.feathr.offline.util.FeathrUtils$;
import com.linkedin.feathr.swj.join.SlidingWindowJoinIterator;
import com.linkedin.feathr.swj.join.SlidingWindowJoinIterator$;
import com.linkedin.feathr.swj.transformer.FeatureTransformer$;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.ArrayType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: SlidingWindowJoin.scala */
/* loaded from: input_file:com/linkedin/feathr/swj/SlidingWindowJoin$.class */
public final class SlidingWindowJoin$ {
    public static SlidingWindowJoin$ MODULE$;
    private SparkSession spark;
    private final Logger log;
    private final String LABEL_VIEW_NAME;
    private volatile boolean bitmap$0;

    static {
        new SlidingWindowJoin$();
    }

    public Logger log() {
        return this.log;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [com.linkedin.feathr.swj.SlidingWindowJoin$] */
    private SparkSession spark$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.spark = SparkSession$.MODULE$.builder().getOrCreate();
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.spark;
    }

    public SparkSession spark() {
        return !this.bitmap$0 ? spark$lzycompute() : this.spark;
    }

    private String LABEL_VIEW_NAME() {
        return this.LABEL_VIEW_NAME;
    }

    public Dataset<Row> join(LabelData labelData, List<FactData> list, int i) {
        list.foreach(factData -> {
            $anonfun$join$1(factData);
            return BoxedUnit.UNIT;
        });
        Dataset<Row> addLabelDataCols = addLabelDataCols(labelData.dataSource(), labelData);
        Dataset<Row> sortWithinPartitions = addLabelDataCols.repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{addLabelDataCols.col(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME())})).sortWithinPartitions(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Predef$.MODULE$.wrapRefArray(new String[]{FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()}));
        ObjectRef create = ObjectRef.create(addLabelDataCols.schema());
        int fieldIndex = ((StructType) create.elem).fieldIndex(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME());
        int fieldIndex2 = ((StructType) create.elem).fieldIndex(FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME());
        ObjectRef create2 = ObjectRef.create(handleSanityCheckModeLabelDf(list, i, sortWithinPartitions, new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(spark().sparkContext().getConf(), FeathrUtils$.MODULE$.ENABLE_SANITY_CHECK_MODE()))).toBoolean()));
        dumpDebugInfo(list, i, sortWithinPartitions);
        list.foreach(factData2 -> {
            $anonfun$join$3(i, create2, create, fieldIndex, fieldIndex2, factData2);
            return BoxedUnit.UNIT;
        });
        Dataset<Row> drop = spark().createDataFrame((RDD) create2.elem, (StructType) create.elem).drop(Predef$.MODULE$.wrapRefArray(new String[]{FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()}));
        Set<String> set = ((TraversableOnce) list.flatMap(factData3 -> {
            return (List) factData3.aggFeatures().map(slidingWindowFeature -> {
                return slidingWindowFeature.name();
            }, List$.MODULE$.canBuildFrom());
        }, List$.MODULE$.canBuildFrom())).toSet();
        FeathrUtils$.MODULE$.dumpDebugInfo(spark(), drop, set, new StringBuilder(30).append("SWA after joined with feature ").append(set.mkString("_")).toString(), new StringBuilder(37).append("observation_after_joined_SWA_feature_").append(set.mkString("_")).toString());
        return drop;
    }

    public int join$default$3() {
        return spark().sparkContext().getConf().getInt(SQLConf$.MODULE$.SHUFFLE_PARTITIONS().key(), 200);
    }

    private RDD<Row> handleSanityCheckModeLabelDf(List<FactData> list, int i, Dataset<Row> dataset, boolean z) {
        if (!z || !list.nonEmpty()) {
            return dataset.rdd();
        }
        Dataset<Row> appendRows = z ? DataFrameExt$.MODULE$.DataFrameMethods(dataset).appendRows((Seq) new $colon.colon(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Nil$.MODULE$), (Seq) new $colon.colon(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Nil$.MODULE$), FeatureTransformer$.MODULE$.transformFactData((FactData) list.head())) : dataset;
        return appendRows.repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{appendRows.col(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME())})).sortWithinPartitions(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Predef$.MODULE$.wrapRefArray(new String[]{FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()})).rdd();
    }

    private void dumpDebugInfo(List<FactData> list, int i, Dataset<Row> dataset) {
        if (new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(spark().sparkContext().getConf(), FeathrUtils$.MODULE$.ENABLE_DEBUG_OUTPUT()))).toBoolean() && list.nonEmpty()) {
            FactData factData = (FactData) list.head();
            Dataset<Row> transformFactData = FeatureTransformer$.MODULE$.transformFactData(factData);
            Dataset<Row> sortWithinPartitions = transformFactData.repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{transformFactData.col(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME())})).sortWithinPartitions(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Predef$.MODULE$.wrapRefArray(new String[]{FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()}));
            Set<String> set = ((TraversableOnce) factData.aggFeatures().map(slidingWindowFeature -> {
                return slidingWindowFeature.name();
            }, List$.MODULE$.canBuildFrom())).toSet();
            String sb = new StringBuilder(9).append("features_").append(set.mkString("_")).toString();
            Seq colonVar = new $colon.colon(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Nil$.MODULE$);
            FeathrUtils$.MODULE$.dumpDebugInfo(spark(), dataset.select((String) colonVar.head(), (Seq) colonVar.tail()), set, "observation data", new StringBuilder(19).append(sb).append("for SWA before join").toString());
            FeathrUtils$.MODULE$.dumpDebugInfo(spark(), sortWithinPartitions, set, "SWA feature data", new StringBuilder(12).append(sb).append("_swa_feature").toString());
        }
    }

    private Dataset<Row> addLabelDataCols(Dataset<Row> dataset, LabelData labelData) {
        dataset.createOrReplaceTempView(LABEL_VIEW_NAME());
        return spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(65).append("\n         |SELECT\n         |").append((Object) (labelData.joinKey().size() > 1 ? new StringBuilder(8).append("struct(").append(labelData.joinKey().mkString(",")).append(")").toString() : String.valueOf(labelData.joinKey().head()))).append(" AS ").append(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME()).append(",\n         |").append(labelData.timestampCol()).append(" AS ").append(FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()).append(", * FROM ").append(LABEL_VIEW_NAME()).append("\n       ").toString())).stripMargin());
    }

    public static final /* synthetic */ void $anonfun$join$2(SlidingWindowFeature slidingWindowFeature) {
        MODULE$.log().info(new StringBuilder(20).append("Evaluating feature ").append(slidingWindowFeature.name()).append("\n").toString());
    }

    public static final /* synthetic */ void $anonfun$join$1(FactData factData) {
        factData.aggFeatures().foreach(slidingWindowFeature -> {
            $anonfun$join$2(slidingWindowFeature);
            return BoxedUnit.UNIT;
        });
        MODULE$.log().info(new StringBuilder(20).append("Feature's keys are ").append(factData.joinKey()).append("\n").toString());
    }

    public static final /* synthetic */ void $anonfun$join$3(int i, ObjectRef objectRef, ObjectRef objectRef2, int i2, int i3, FactData factData) {
        Dataset<Row> transformFactData = FeatureTransformer$.MODULE$.transformFactData(factData);
        RDD rdd = transformFactData.repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{transformFactData.col(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME())})).sortWithinPartitions(FeatureTransformer$.MODULE$.JOIN_KEY_COL_NAME(), Predef$.MODULE$.wrapRefArray(new String[]{FeatureTransformer$.MODULE$.TIMESTAMP_COL_NAME()})).rdd();
        StructType schema = transformFactData.schema();
        if (factData.dataSource().isEmpty()) {
            Seq seq = (Seq) factData.aggFeatures().map(slidingWindowFeature -> {
                return null;
            }, List$.MODULE$.canBuildFrom());
            RDD rdd2 = (RDD) objectRef.elem;
            objectRef.elem = rdd2.mapPartitions(iterator -> {
                return iterator.map(row -> {
                    return Row$.MODULE$.merge(Predef$.MODULE$.wrapRefArray(new Row[]{row, Row$.MODULE$.fromSeq(seq)}));
                });
            }, rdd2.mapPartitions$default$2(), ClassTag$.MODULE$.apply(Row.class));
        } else {
            objectRef.elem = ((RDD) objectRef.elem).zipPartitions(rdd, true, (iterator2, iterator3) -> {
                return new SlidingWindowJoinIterator(iterator2, iterator3, (StructType) objectRef2.elem, i2, i3, schema, factData.aggFeatures());
            }, ClassTag$.MODULE$.apply(Row.class), ClassTag$.MODULE$.apply(Row.class));
        }
        objectRef2.elem = StructType$.MODULE$.apply((Seq) ((StructType) objectRef2.elem).$plus$plus(StructType$.MODULE$.apply((Seq) SlidingWindowJoinIterator$.MODULE$.generateFeatureColumns(factData.aggFeatures(), schema).map(featureColumnMetaData -> {
            if (featureColumnMetaData.groupSpec().isEmpty()) {
                return new StructField(featureColumnMetaData.featureName(), featureColumnMetaData.aggDataType(), true, StructField$.MODULE$.apply$default$4());
            }
            return new StructField(featureColumnMetaData.featureName(), ArrayType$.MODULE$.apply(StructType$.MODULE$.apply(Nil$.MODULE$.$colon$colon(new StructField("group_agg_metric", featureColumnMetaData.aggDataType(), true, StructField$.MODULE$.apply$default$4())).$colon$colon(new StructField(FeatureTransformer$.MODULE$.GROUP_COL_NAME(), (DataType) featureColumnMetaData.groupColDataType().get(), true, StructField$.MODULE$.apply$default$4())))), true, StructField$.MODULE$.apply$default$4());
        }, List$.MODULE$.canBuildFrom())), Seq$.MODULE$.canBuildFrom()));
    }

    private SlidingWindowJoin$() {
        MODULE$ = this;
        this.log = LogManager.getLogger(getClass());
        this.LABEL_VIEW_NAME = "label_data";
    }
}
