package com.linkedin.feathr.offline.swa;

import com.linkedin.feathr.common.ErasedEntityTaggedFeature;
import com.linkedin.feathr.common.FeatureTypeConfig;
import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrConfigException;
import com.linkedin.feathr.offline.FeatureDataFrame;
import com.linkedin.feathr.offline.anchored.WindowTimeUnit$;
import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor;
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource;
import com.linkedin.feathr.offline.anchored.keyExtractor.MVELSourceKeyExtractor;
import com.linkedin.feathr.offline.anchored.keyExtractor.SQLSourceKeyExtractor;
import com.linkedin.feathr.offline.client.DataFrameColName$;
import com.linkedin.feathr.offline.config.FeatureJoinConfig;
import com.linkedin.feathr.offline.config.JoinConfigSettings;
import com.linkedin.feathr.offline.config.JoinTimeSetting;
import com.linkedin.feathr.offline.config.TimeWindowFeatureDefinition;
import com.linkedin.feathr.offline.exception.FeathrIllegalStateException;
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager$;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner$;
import com.linkedin.feathr.offline.source.DataSource;
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper;
import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter$;
import com.linkedin.feathr.offline.util.DataFrameUtils$;
import com.linkedin.feathr.offline.util.FeathrUtils$;
import com.linkedin.feathr.offline.util.SuppressedExceptionHandlerUtils$;
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval;
import com.linkedin.feathr.sparkcommon.SourceKeyExtractor;
import com.linkedin.feathr.swj.FactData;
import com.linkedin.feathr.swj.LabelData;
import com.linkedin.feathr.swj.SlidingWindowJoin$;
import java.time.Duration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.util.sketch.BloomFilter;
import scala.Array$;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.HashSet;
import scala.math.Ordering$;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: SlidingWindowAggregationJoiner.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005}e!\u0002\u0005\n\u0001-\u0019\u0002\u0002\u0003\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\t\u0011I\u0002!\u0011!Q\u0001\nMBQ!\u000f\u0001\u0005\u0002iBqa\u0010\u0001C\u0002\u0013%\u0001\t\u0003\u0004N\u0001\u0001\u0006I!\u0011\u0005\u0006\u001d\u0002!\ta\u0014\u0005\b\u0003\u0017\u0003A\u0011AAG\u0005y\u0019F.\u001b3j]\u001e<\u0016N\u001c3po\u0006;wM]3hCRLwN\u001c&pS:,'O\u0003\u0002\u000b\u0017\u0005\u00191o^1\u000b\u00051i\u0011aB8gM2Lg.\u001a\u0006\u0003\u001d=\taAZ3bi\"\u0014(B\u0001\t\u0012\u0003!a\u0017N\\6fI&t'\"\u0001\n\u0002\u0007\r|Wn\u0005\u0002\u0001)A\u0011Q\u0003G\u0007\u0002-)\tq#A\u0003tG\u0006d\u0017-\u0003\u0002\u001a-\t1\u0011I\\=SK\u001a\fA#\u00197m/&tGm\\<BO\u001e4U-\u0019;ve\u0016\u001c8\u0001\u0001\t\u0005;\u0011:#F\u0004\u0002\u001fEA\u0011qDF\u0007\u0002A)\u0011\u0011eG\u0001\u0007yI|w\u000e\u001e \n\u0005\r2\u0012A\u0002)sK\u0012,g-\u0003\u0002&M\t\u0019Q*\u00199\u000b\u0005\r2\u0002CA\u000f)\u0013\tIcE\u0001\u0004TiJLgn\u001a\t\u0003WAj\u0011\u0001\f\u0006\u0003[9\nqAZ3biV\u0014XM\u0003\u00020\u0017\u0005A\u0011M\\2i_J,G-\u0003\u00022Y\t9b)Z1ukJ,\u0017I\\2i_J<\u0016\u000e\u001e5T_V\u00148-Z\u0001\u0019C:\u001c\u0007n\u001c:U_\u0012\u000bG/Y*pkJ\u001cW-T1qa\u0016\u0014\bC\u0001\u001b8\u001b\u0005)$B\u0001\u001c\f\u00039!(/\u00198tM>\u0014X.\u0019;j_:L!\u0001O\u001b\u00031\u0005s7\r[8s)>$\u0015\r^1T_V\u00148-Z'baB,'/\u0001\u0004=S:LGO\u0010\u000b\u0004wur\u0004C\u0001\u001f\u0001\u001b\u0005I\u0001\"\u0002\u000e\u0004\u0001\u0004a\u0002\"\u0002\u001a\u0004\u0001\u0004\u0019\u0014a\u00017pOV\t\u0011\t\u0005\u0002C\u00176\t1I\u0003\u0002E\u000b\u0006)An\\45U*\u0011aiR\u0001\bY><w-\u001b8h\u0015\tA\u0015*\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0015\u0006\u0019qN]4\n\u00051\u001b%A\u0002'pO\u001e,'/\u0001\u0003m_\u001e\u0004\u0013!\u00076pS:<\u0016N\u001c3po\u0006;wMR3biV\u0014Xm]!t\t\u001a#\"\u0003\u00151ku\u0006\u0015\u0011\u0011BA\u0016\u0003{\t\t'!\u001e\u0002��A!Q#U*X\u0013\t\u0011fC\u0001\u0004UkBdWM\r\t\u0003)Vk\u0011aC\u0005\u0003-.\u0011\u0001CR3biV\u0014X\rR1uC\u001a\u0013\u0018-\\3\u0011\u0007akvE\u0004\u0002Z7:\u0011qDW\u0005\u0002/%\u0011ALF\u0001\ba\u0006\u001c7.Y4f\u0013\tqvLA\u0002TKFT!\u0001\u0018\f\t\u000b\u00054\u0001\u0019\u00012\u0002\u0005M\u001c\bCA2i\u001b\u0005!'BA3g\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003O\u001e\u000bQa\u001d9be.L!!\u001b3\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\t\u000b-4\u0001\u0019\u00017\u0002\u000b=\u00147\u000f\u0012$\u0011\u00055<hB\u00018w\u001d\tyWO\u0004\u0002qi:\u0011\u0011o\u001d\b\u0003?IL\u0011AS\u0005\u0003\u0011&K!aZ$\n\u0005\u00154\u0017B\u0001/e\u0013\tA\u0018PA\u0005ECR\fgI]1nK*\u0011A\f\u001a\u0005\u0006w\u001a\u0001\r\u0001`\u0001\u000bU>LgnQ8oM&<\u0007cA?\u0002\u00025\taP\u0003\u0002��\u0017\u000511m\u001c8gS\u001eL1!a\u0001\u007f\u0005E1U-\u0019;ve\u0016Tu.\u001b8D_:4\u0017n\u001a\u0005\u0007\u0003\u000f1\u0001\u0019A,\u0002\u0015-,\u0017\u0010V1h\u0019&\u001cH\u000fC\u0004\u0002\f\u0019\u0001\r!!\u0004\u0002-]Lg\u000eZ8x\u0003\u001e<g)Z1ukJ,7\u000b^1hKN\u0004B\u0001W/\u0002\u0010A!\u0011\u0011CA\u0013\u001d\u0011\t\u0019\"a\t\u000f\t\u0005U\u0011\u0011\u0005\b\u0005\u0003/\tyB\u0004\u0003\u0002\u001a\u0005uabA\u0010\u0002\u001c%\t!#\u0003\u0002\u0011#%\u0011abD\u0005\u0003\u00195I!\u0001X\u0006\n\t\u0005\u001d\u0012\u0011\u0006\u0002\n\u0015>Lgn\u0015;bO\u0016T!\u0001X\u0006\t\u000f\u00055b\u00011\u0001\u00020\u0005I\"/Z9vSJ,GmV5oI><\u0018iZ4GK\u0006$XO]3t!\u0011AV,!\r\u0011\t\u0005M\u0012\u0011H\u0007\u0003\u0003kQ1!a\u000e\u000e\u0003\u0019\u0019w.\\7p]&!\u00111HA\u001b\u0005e)%/Y:fI\u0016sG/\u001b;z)\u0006<w-\u001a3GK\u0006$XO]3\t\u000f\u0005}b\u00011\u0001\u0002B\u0005a!\r\\8p[\u001aKG\u000e^3sgB)Q#a\u0011\u0002H%\u0019\u0011Q\t\f\u0003\r=\u0003H/[8o!\u0019iB%!\u0013\u0002RA!\u0001,XA&!\r)\u0012QJ\u0005\u0004\u0003\u001f2\"aA%oiB!\u00111KA/\u001b\t\t)F\u0003\u0003\u0002X\u0005e\u0013AB:lKR\u001c\u0007NC\u0002\u0002\\\u0019\fA!\u001e;jY&!\u0011qLA+\u0005-\u0011En\\8n\r&dG/\u001a:\t\u000f\u0005\rd\u00011\u0001\u0002f\u0005i1o^1PEN$\u0016.\\3PaR\u0004R!FA\"\u0003O\u0002B!!\u001b\u0002r5\u0011\u00111\u000e\u0006\u0005\u0003[\ny'\u0001\u0005eCR,G/[7f\u0015\r\tYfC\u0005\u0005\u0003g\nYG\u0001\tECR,G+[7f\u0013:$XM\u001d<bY\"9\u0011q\u000f\u0004A\u0002\u0005e\u0014A\u00064bS2|e.T5tg&tw\rU1si&$\u0018n\u001c8\u0011\u0007U\tY(C\u0002\u0002~Y\u0011qAQ8pY\u0016\fg\u000eC\u0004\u0002\u0002\u001a\u0001\r!a!\u0002\u0015M<\u0018\rS1oI2,'\u000fE\u0003\u0016\u0003\u0007\n)\tE\u0002=\u0003\u000fK1!!#\n\u0005)\u0019v+\u0011%b]\u0012dWM]\u0001\u001egR\fg\u000eZ1sI&TXMR3biV\u0014XmQ8mk6tg*Y7fgRIA.a$\u0002\u0014\u0006]\u00151\u0014\u0005\u0007\u0003#;\u0001\u0019A,\u0002+=\u0014\u0018nZ\"p]R,\u0007\u0010^(cg\u000e{G.^7og\"1\u0011QS\u0004A\u00021\f\u0001c^5uQN;\u0016IR3biV\u0014X\r\u0012$\t\r\u0005eu\u00011\u0001X\u000311W-\u0019;ve\u0016t\u0015-\\3t\u0011\u0019\tij\u0002a\u0001/\u000691.Z=UC\u001e\u001c\b")
/* loaded from: input_file:com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.class */
public class SlidingWindowAggregationJoiner {
    private final Map<String, FeatureAnchorWithSource> allWindowAggFeatures;
    private final AnchorToDataSourceMapper anchorToDataSourceMapper;
    private final Logger log = LogManager.getLogger(getClass());

    private Logger log() {
        return this.log;
    }

    public Tuple2<FeatureDataFrame, Seq<String>> joinWindowAggFeaturesAsDF(SparkSession sparkSession, Dataset<Row> dataset, FeatureJoinConfig featureJoinConfig, Seq<String> seq, Seq<Tuple2<Seq<Object>, Seq<String>>> seq2, Seq<ErasedEntityTaggedFeature> seq3, Option<Map<Seq<Object>, BloomFilter>> option, Option<DateTimeInterval> option2, boolean z, Option<SWAHandler> option3) {
        Option<JoinConfigSettings> option4 = featureJoinConfig.settings();
        if (option4.isEmpty()) {
            throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "settings section are not defined in join config, cannot extract observation data time range");
        }
        if (((JoinConfigSettings) option4.get()).joinTimeSetting().isEmpty()) {
            throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "joinTimeSettings section is not defined in join config, cannot perform window aggregation operation");
        }
        JoinTimeSetting joinTimeSetting = (JoinTimeSetting) ((JoinConfigSettings) option4.get()).joinTimeSetting().get();
        Option<Duration> simulateTimeDelay = joinTimeSetting.simulateTimeDelay();
        boolean z2 = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession.sparkContext().getConf(), FeathrUtils$.MODULE$.FILTER_NULLS()))).toBoolean();
        boolean z3 = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession.sparkContext().getConf(), FeathrUtils$.MODULE$.SKIP_MISSING_FEATURE()))).toBoolean();
        boolean z4 = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession.sparkContext().getConf(), FeathrUtils$.MODULE$.ADD_DEFAULT_COL_FOR_MISSING_DATA()))).toBoolean();
        if (simulateTimeDelay.isEmpty() && !featureJoinConfig.featuresToTimeDelayMap().isEmpty()) {
            throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "overrideTimeDelay cannot be defined without setting a simulateTimeDelay in the joinTimeSettings");
        }
        Map $plus$plus = featureJoinConfig.featuresToTimeDelayMap().mapValues(str -> {
            return WindowTimeUnit$.MODULE$.parseWindowTime(str);
        }).$plus$plus(Option$.MODULE$.option2Iterable(simulateTimeDelay.map(duration -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(SlidingWindowFeatureUtils$.MODULE$.DEFAULT_TIME_DELAY()), duration);
        })).toMap(Predef$.MODULE$.$conforms()));
        IndexedSeq indexedSeq = ((TraversableOnce) seq3.map(erasedEntityTaggedFeature -> {
            return erasedEntityTaggedFeature.getFeatureName();
        }, Seq$.MODULE$.canBuildFrom())).toIndexedSeq();
        IndexedSeq indexedSeq2 = (IndexedSeq) indexedSeq.map(this.allWindowAggFeatures, IndexedSeq$.MODULE$.canBuildFrom());
        ObjectRef create = ObjectRef.create(dataset);
        DateTimeInterval dateTimeInterval = (DateTimeInterval) option2.get();
        Map map = (Map) ((TraversableLike) indexedSeq2.map(featureAnchorWithSource -> {
            return new Tuple2(new Tuple3(featureAnchorWithSource.source(), featureAnchorWithSource.featureAnchor().sourceKeyExtractor().toString(), PreprocessedDataFrameManager$.MODULE$.getPreprocessingUniquenessForAnchor(featureAnchorWithSource)), featureAnchorWithSource);
        }, IndexedSeq$.MODULE$.canBuildFrom())).groupBy(tuple2 -> {
            return (Tuple3) tuple2._1();
        }).map(tuple22 -> {
            if (tuple22 != null) {
                return new Tuple2((Tuple3) tuple22._1(), ((IndexedSeq) tuple22._2()).map(tuple22 -> {
                    return (FeatureAnchorWithSource) tuple22._2();
                }, IndexedSeq$.MODULE$.canBuildFrom()));
            }
            throw new MatchError(tuple22);
        }, Map$.MODULE$.canBuildFrom());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Map map2 = (Map) map.flatMap(tuple23 -> {
            Dataset<Row> dataset2;
            IndexedSeq indexedSeq3;
            if (tuple23 == null) {
                throw new MatchError(tuple23);
            }
            Tuple3 tuple3 = (Tuple3) tuple23._1();
            IndexedSeq indexedSeq4 = (IndexedSeq) tuple23._2();
            Duration duration2 = (Duration) ((TraversableOnce) indexedSeq4.map(featureAnchorWithSource2 -> {
                return SlidingWindowFeatureUtils$.MODULE$.getMaxWindowDurationInAnchor(featureAnchorWithSource2, indexedSeq);
            }, IndexedSeq$.MODULE$.canBuildFrom())).max(Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms()));
            this.log().info(new StringBuilder(60).append("Selected max window duration ").append(duration2).append(" across all anchors for source ").append(((DataSource) tuple3._1()).path()).toString());
            IndexedSeq indexedSeq5 = (IndexedSeq) indexedSeq4.flatMap(featureAnchorWithSource3 -> {
                return featureAnchorWithSource3.featureAnchor().features();
            }, IndexedSeq$.MODULE$.canBuildFrom());
            Some some = PreprocessedDataFrameManager$.MODULE$.preprocessedDfMap().get(((TraversableOnce) indexedSeq5.toSet().toSeq().sorted(Ordering$String$.MODULE$)).mkString(","));
            Dataset<Row> windowAggAnchorDFMapForJoin = this.anchorToDataSourceMapper.getWindowAggAnchorDFMapForJoin(sparkSession, (DataSource) tuple3._1(), dateTimeInterval, duration2, (Duration[]) $plus$plus.values().toArray(ClassTag$.MODULE$.apply(Duration.class)), z);
            if (windowAggAnchorDFMapForJoin.isEmpty() && z3) {
                indexedSeq5.map(str2 -> {
                    return BoxesRunTime.boxToBoolean(hashSet.add(str2));
                }, IndexedSeq$.MODULE$.canBuildFrom());
                indexedSeq3 = (IndexedSeq) indexedSeq4.map(featureAnchorWithSource4 -> {
                    return new Tuple2(featureAnchorWithSource4, windowAggAnchorDFMapForJoin);
                }, IndexedSeq$.MODULE$.canBuildFrom());
            } else if (windowAggAnchorDFMapForJoin.isEmpty() && z4) {
                indexedSeq5.map(str3 -> {
                    return BoxesRunTime.boxToBoolean(hashSet2.add(str3));
                }, IndexedSeq$.MODULE$.canBuildFrom());
                String mkString = hashSet2.mkString();
                this.log().warn(new StringBuilder(77).append("Missing data for features ").append(hashSet2).append(". Default values will be populated for this column.").toString());
                SuppressedExceptionHandlerUtils$.MODULE$.missingDataSuppressedExceptionMsgs_$eq(new StringBuilder(0).append(SuppressedExceptionHandlerUtils$.MODULE$.missingDataSuppressedExceptionMsgs()).append(mkString).toString());
                indexedSeq3 = (IndexedSeq) indexedSeq4.map(featureAnchorWithSource5 -> {
                    return new Tuple2(featureAnchorWithSource5, windowAggAnchorDFMapForJoin);
                }, IndexedSeq$.MODULE$.canBuildFrom());
            } else {
                if (some instanceof Some) {
                    dataset2 = (Dataset) some.value();
                } else {
                    if (!None$.MODULE$.equals(some)) {
                        throw new MatchError(some);
                    }
                    dataset2 = windowAggAnchorDFMapForJoin;
                }
                Dataset<Row> dataset3 = dataset2;
                SourceKeyExtractor sourceKeyExtractor = ((FeatureAnchorWithSource) indexedSeq4.head()).featureAnchor().sourceKeyExtractor();
                Dataset<Row> appendKeyColumns = sourceKeyExtractor instanceof SQLSourceKeyExtractor ? ((SQLSourceKeyExtractor) sourceKeyExtractor).appendKeyColumns(dataset3, false) : sourceKeyExtractor.appendKeyColumns(dataset3);
                indexedSeq3 = (IndexedSeq) indexedSeq4.map(featureAnchorWithSource6 -> {
                    return new Tuple2(featureAnchorWithSource6, appendKeyColumns);
                }, IndexedSeq$.MODULE$.canBuildFrom());
            }
            return indexedSeq3;
        }, Map$.MODULE$.canBuildFrom());
        Map map3 = (Map) map2.filter(tuple24 -> {
            return BoxesRunTime.boxToBoolean($anonfun$joinWindowAggFeaturesAsDF$16(tuple24));
        });
        scala.collection.mutable.Map empty = scala.collection.mutable.Map$.MODULE$.empty();
        seq2.foreach(tuple25 -> {
            $anonfun$joinWindowAggFeaturesAsDF$17(this, hashSet, seq, option, joinTimeSetting, z2, create, sparkSession, map3, $plus$plus, option3, map2, hashSet2, empty, tuple25);
            return BoxedUnit.UNIT;
        });
        return new Tuple2<>(new FeatureDataFrame((Dataset) create.elem, empty.toMap(Predef$.MODULE$.$conforms())), hashSet.toSeq());
    }

    public Dataset<Row> standardizeFeatureColumnNames(Seq<String> seq, Dataset<Row> dataset, Seq<String> seq2, Seq<String> seq3) {
        int size = seq.size();
        int size2 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).size();
        if (size2 != size + seq2.size()) {
            throw new FeathrIllegalStateException(new StringBuilder(238).append("Number of columns (").append(size2).append(") in the dataframe returned by ").append("sliding window aggregation does not equal to number of columns in the observation data (").append(size).append(") ").append("+ number of features (").append(seq2.size()).append("). Columns in returned dataframe are ").append(dataset.columns()).append(",").append(" columns in observation dataframe are ").append(seq).toString());
        }
        return (Dataset) ((Seq) seq2.map(str -> {
            return new Tuple2(str, DataFrameColName$.MODULE$.genFeatureColumnName(str, new Some(seq3)));
        }, Seq$.MODULE$.canBuildFrom())).foldLeft(dataset, (dataset2, tuple2) -> {
            return dataset2.withColumnRenamed((String) tuple2._1(), (String) tuple2._2());
        });
    }

    public static final /* synthetic */ boolean $anonfun$joinWindowAggFeaturesAsDF$16(Tuple2 tuple2) {
        return !new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) ((Dataset) tuple2._2()).head(1))).isEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$joinWindowAggFeaturesAsDF$25(HashSet hashSet, Tuple2 tuple2) {
        return hashSet.contains(((FeatureAnchorWithSource) tuple2._1()).selectedFeatures().head());
    }

    public static final /* synthetic */ void $anonfun$joinWindowAggFeaturesAsDF$31(SlidingWindowAggregationJoiner slidingWindowAggregationJoiner, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        FactData factData = (FactData) tuple2._1();
        slidingWindowAggregationJoiner.log().debug(new StringBuilder(35).append("First 3 rows in feature dataset ").append(tuple2._2$mcI$sp()).append(":\n ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) factData.dataSource().collect())).take(3))).map(row -> {
            return row.toString();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).mkString("\n ")).toString());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$joinWindowAggFeaturesAsDF$17(SlidingWindowAggregationJoiner slidingWindowAggregationJoiner, HashSet hashSet, Seq seq, Option option, JoinTimeSetting joinTimeSetting, boolean z, ObjectRef objectRef, SparkSession sparkSession, Map map, Map map2, Option option2, Map map3, HashSet hashSet2, scala.collection.mutable.Map map4, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        Option option3;
        if (tuple2 != null) {
            Seq seq2 = (Seq) tuple2._1();
            Seq seq3 = (Seq) tuple2._2();
            if (seq2 != null && seq3 != null) {
                Seq<String> seq4 = (Seq) seq3.diff(hashSet.toSeq());
                if (seq4.nonEmpty()) {
                    slidingWindowAggregationJoiner.log().warn(new StringBuilder(37).append("----SKIPPED ADDING FEATURES : ").append(hashSet).append(" ------").toString());
                    Seq seq5 = (Seq) ((TraversableLike) seq2.map(seq, Seq$.MODULE$.canBuildFrom())).map(str -> {
                        return new StringBuilder(17).append("CAST (").append(str).append(" AS string)").toString();
                    }, Seq$.MODULE$.canBuildFrom());
                    if (option instanceof Some) {
                        option3 = Option$.MODULE$.apply(((Map) ((Some) option).value()).apply(seq2));
                    } else {
                        if (!None$.MODULE$.equals(option)) {
                            throw new MatchError(option);
                        }
                        option3 = None$.MODULE$;
                    }
                    Option option4 = option3;
                    String constructTimeStampExpr = !joinTimeSetting.useLatestFeatureData() ? SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr(joinTimeSetting.timestampColumn().name(), joinTimeSetting.timestampColumn().format(), SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr$default$3()) : "unix_timestamp()";
                    Tuple2 tuple22 = z ? new Tuple2(DataFrameUtils$.MODULE$.filterNulls((Dataset) objectRef.elem, (Seq) seq2.map(seq, Seq$.MODULE$.canBuildFrom())), DataFrameUtils$.MODULE$.filterNonNulls((Dataset) objectRef.elem, (Seq) seq2.map(seq, Seq$.MODULE$.canBuildFrom()))) : new Tuple2((Dataset) objectRef.elem, sparkSession.emptyDataFrame());
                    if (tuple22 == null) {
                        throw new MatchError(tuple22);
                    }
                    Tuple2 tuple23 = new Tuple2((Dataset) tuple22._1(), (Dataset) tuple22._2());
                    Dataset dataset = (Dataset) tuple23._1();
                    Dataset dataset2 = (Dataset) tuple23._2();
                    LabelData labelData = new LabelData(dataset, seq5, constructTimeStampExpr);
                    if (sparkSession.sparkContext().isLocal() && slidingWindowAggregationJoiner.log().isDebugEnabled()) {
                        slidingWindowAggregationJoiner.log().debug(new StringBuilder(87).append("*********Sliding window aggregation feature join stage with key: ").append(seq5).append(" for feature ").append(seq4.mkString(",")).append("*********").toString());
                        slidingWindowAggregationJoiner.log().debug(new StringBuilder(39).append("First 3 rows in observation dataset :\n ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) labelData.dataSource().collect())).take(3))).map(row -> {
                            return row.toString();
                        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).mkString("\n ")).toString());
                    }
                    Map<FeatureAnchorWithSource, Dataset<Row>> filterKeys = map.filterKeys(((Seq) seq4.map(slidingWindowAggregationJoiner.allWindowAggFeatures, Seq$.MODULE$.canBuildFrom())).toSet());
                    Seq seq6 = (Seq) SlidingWindowFeatureUtils$.MODULE$.getSWAAnchorGroups(filterKeys).map(map5 -> {
                        Dataset<Row> drop;
                        Set<String> set = (Set) ((TraversableLike) map5.keySet().flatMap(featureAnchorWithSource -> {
                            return featureAnchorWithSource.selectedFeatures();
                        }, Set$.MODULE$.canBuildFrom())).filter(str2 -> {
                            return BoxesRunTime.boxToBoolean(seq4.contains(str2));
                        });
                        Dataset<Row> dataset3 = (Dataset) ((Tuple2) map5.head())._2();
                        FeatureAnchorWithSource featureAnchorWithSource2 = (FeatureAnchorWithSource) ((Tuple2) map5.head())._1();
                        Seq<String> keyColumnNames = featureAnchorWithSource2.featureAnchor().sourceKeyExtractor().getKeyColumnNames(None$.MODULE$);
                        if (None$.MODULE$.equals(option4)) {
                            drop = dataset3;
                        } else {
                            if (!(option4 instanceof Some)) {
                                throw new MatchError(option4);
                            }
                            BloomFilter bloomFilter = (BloomFilter) ((Some) option4).value();
                            if (featureAnchorWithSource2.featureAnchor().sourceKeyExtractor() instanceof MVELSourceKeyExtractor) {
                                throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "MVELSourceKeyExtractor is not supported in sliding window aggregation");
                            }
                            DataFrameKeyCombiner apply = DataFrameKeyCombiner$.MODULE$.apply();
                            Tuple2<String, Dataset<Row>> combine = apply.combine(dataset3, keyColumnNames, apply.combine$default$3());
                            if (combine == null) {
                                throw new MatchError(combine);
                            }
                            Tuple2 tuple24 = new Tuple2((String) combine._1(), (Dataset) combine._2());
                            String str3 = (String) tuple24._1();
                            drop = ((Dataset) tuple24._2()).filter(SlidingWindowFeatureUtils$.MODULE$.mightContain(bloomFilter).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(str3)}))).drop(functions$.MODULE$.col(str3));
                        }
                        return SlidingWindowFeatureUtils$.MODULE$.getFactDataDef(drop, map5.keySet().toSeq(), map2, set);
                    }, Seq$.MODULE$.canBuildFrom());
                    String[] columns = labelData.dataSource().columns();
                    objectRef.elem = option2.isDefined() ? (Dataset) ((SWAHandler) option2.get()).join().apply(labelData, seq6.toList()) : SlidingWindowJoin$.MODULE$.join(labelData, seq6.toList(), SlidingWindowJoin$.MODULE$.join$default$3());
                    objectRef.elem = (!z || dataset2.isEmpty()) ? (Dataset) objectRef.elem : ((Dataset) objectRef.elem).union((Dataset) seq4.foldLeft(dataset2, (dataset3, str2) -> {
                        return dataset3.withColumn(str2, functions$.MODULE$.lit((Object) null));
                    }));
                    Map<String, FeatureValue> $plus$plus = ((MapLike) filterKeys.flatMap(tuple24 -> {
                        return ((FeatureAnchorWithSource) tuple24._1()).featureAnchor().defaults();
                    }, Map$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce) ((TraversableLike) map3.filter(tuple25 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$joinWindowAggFeaturesAsDF$25(hashSet2, tuple25));
                    })).flatMap(tuple26 -> {
                        return ((FeatureAnchorWithSource) tuple26._1()).featureAnchor().defaults();
                    }, Map$.MODULE$.canBuildFrom()));
                    Map<String, FeatureTypeConfig> map6 = (Map) filterKeys.flatMap(tuple27 -> {
                        return ((FeatureAnchorWithSource) tuple27._1()).featureAnchor().featureTypeConfigs();
                    }, Map$.MODULE$.canBuildFrom());
                    Map<String, Enumeration.Value> map7 = (Map) slidingWindowAggregationJoiner.allWindowAggFeatures.map(tuple28 -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple28._1()), ((TimeWindowFeatureDefinition) ((TimeWindowConfigurableAnchorExtractor) ((FeatureAnchorWithSource) tuple28._2()).featureAnchor().extractor()).features().apply(tuple28._1())).columnFormat());
                    }, Map$.MODULE$.canBuildFrom());
                    objectRef.elem = (Dataset) hashSet2.foldLeft((Dataset) objectRef.elem, (dataset4, str3) -> {
                        return dataset4.withColumn(str3, functions$.MODULE$.lit((Object) null));
                    });
                    FeatureDataFrame convertSWADFToFDS = SlidingWindowFeatureUtils$.MODULE$.convertSWADFToFDS((Dataset) objectRef.elem, seq4.toSet(), map7, map6);
                    if (convertSWADFToFDS == null) {
                        throw new MatchError(convertSWADFToFDS);
                    }
                    Tuple2 tuple29 = new Tuple2(convertSWADFToFDS.df(), convertSWADFToFDS.inferredFeatureType());
                    Dataset<Row> dataset5 = (Dataset) tuple29._1();
                    Map map8 = (Map) tuple29._2();
                    Dataset<Row> substituteDefaults2 = DataFrameDefaultValueSubstituter$.MODULE$.substituteDefaults2(dataset5, ((TraversableOnce) $plus$plus.keys().filter(obj -> {
                        return BoxesRunTime.boxToBoolean(seq4.contains(obj));
                    })).toSeq(), $plus$plus, map6, sparkSession, DataFrameDefaultValueSubstituter$.MODULE$.substituteDefaults$default$6());
                    map4.$plus$plus$eq(map8);
                    objectRef.elem = slidingWindowAggregationJoiner.standardizeFeatureColumnNames(Predef$.MODULE$.wrapRefArray(columns), substituteDefaults2, seq4, (Seq) seq2.map(seq, Seq$.MODULE$.canBuildFrom()));
                    if (FeathrUtils$.MODULE$.shouldCheckPoint(sparkSession)) {
                        objectRef.elem = ((Dataset) objectRef.elem).checkpoint(true);
                    }
                    if (sparkSession.sparkContext().isLocal() && slidingWindowAggregationJoiner.log().isDebugEnabled()) {
                        ((IterableLike) seq6.zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(tuple210 -> {
                            $anonfun$joinWindowAggFeaturesAsDF$31(slidingWindowAggregationJoiner, tuple210);
                            return BoxedUnit.UNIT;
                        });
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
        }
        throw new MatchError(tuple2);
    }

    public SlidingWindowAggregationJoiner(Map<String, FeatureAnchorWithSource> map, AnchorToDataSourceMapper anchorToDataSourceMapper) {
        this.allWindowAggFeatures = map;
        this.anchorToDataSourceMapper = anchorToDataSourceMapper;
    }
}
