package com.linkedin.feathr.offline.generation;

import com.linkedin.feathr.common.configObj.generation.OutputProcessorConfig;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrConfigException;
import com.linkedin.feathr.offline.job.FeatureGenSpec;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: IncrementalAggSnapshotLoader.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/generation/IncrementalAggSnapshotLoader$.class */
public final class IncrementalAggSnapshotLoader$ implements IncrementalAggSnapshotLoader {
    public static IncrementalAggSnapshotLoader$ MODULE$;
    private final Logger logger;

    static {
        new IncrementalAggSnapshotLoader$();
    }

    @Override // com.linkedin.feathr.offline.generation.IncrementalAggSnapshotLoader
    public IncrementalAggContext load(FeatureGenSpec featureGenSpec, List<DataLoaderHandler> list) {
        IncrementalAggContext load;
        load = load(featureGenSpec, list);
        return load;
    }

    private Logger logger() {
        return this.logger;
    }

    @Override // com.linkedin.feathr.offline.generation.IncrementalAggSnapshotLoader
    public IncrementalAggContext load(FeatureGenSpec featureGenSpec, FileSystem fileSystem, List<DataLoaderHandler> list) {
        boolean isEnableIncrementalAgg = featureGenSpec.isEnableIncrementalAgg();
        if (!isEnableIncrementalAgg) {
            return new IncrementalAggContext(isEnableIncrementalAgg, None$.MODULE$, Predef$.MODULE$.Map().empty(), Predef$.MODULE$.Map().empty());
        }
        Seq seq = (Seq) featureGenSpec.getOutputProcessorConfigs().filter(outputProcessorConfig -> {
            return BoxesRunTime.boxToBoolean($anonfun$load$1(outputProcessorConfig));
        });
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>> seq2 = (Seq) seq.collect(new IncrementalAggSnapshotLoader$$anonfun$1(create, featureGenSpec, fileSystem, list), Seq$.MODULE$.canBuildFrom());
        if (seq2.isEmpty()) {
            throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(109).append("In order to support incremental aggregation, please specify ").append(FeatureGenerationPathName$.MODULE$.STORE_NAME()).append(" and ").append(FeatureGenerationPathName$.MODULE$.FEATURES()).append(" in the HDFS output").append(" processors with FDS type").toString());
        }
        Map<String, String> preAggRootDirMap = getPreAggRootDirMap(seq2);
        return new IncrementalAggContext(isEnableIncrementalAgg, (Option) create.elem, getPreAggSnapshotMap(seq2), preAggRootDirMap);
    }

    public Map<String, String> getPreAggRootDirMap(Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>> seq) {
        return ((TraversableOnce) seq.flatMap(tuple2 -> {
            if (tuple2 != null) {
                String str = (String) tuple2._1();
                Tuple2 tuple2 = (Tuple2) tuple2._2();
                if (tuple2 != null) {
                    return (Seq) ((Seq) tuple2._1()).map(str2 -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str2), str);
                    }, Seq$.MODULE$.canBuildFrom());
                }
            }
            throw new MatchError(tuple2);
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public Map<String, Dataset<Row>> getPreAggSnapshotMap(Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>> seq) {
        Seq seq2 = (Seq) seq.collect(new IncrementalAggSnapshotLoader$$anonfun$2(), Seq$.MODULE$.canBuildFrom());
        if (seq2.nonEmpty()) {
            return ((TraversableOnce) seq2.reduce((seq3, seq4) -> {
                return (Seq) seq3.union(seq4, Seq$.MODULE$.canBuildFrom());
            })).toMap(Predef$.MODULE$.$conforms());
        }
        logger().info("No preAgg dataset exist so far, i.e. first time running incremental agg (cold start)");
        return Predef$.MODULE$.Map().empty();
    }

    public static final /* synthetic */ boolean $anonfun$load$1(OutputProcessorConfig outputProcessorConfig) {
        String name = outputProcessorConfig.getName();
        return name != null ? name.equals("HDFS") : "HDFS" == 0;
    }

    private IncrementalAggSnapshotLoader$() {
        MODULE$ = this;
        IncrementalAggSnapshotLoader.$init$(this);
        this.logger = LogManager.getLogger(getClass());
    }
}
