package com.linkedin.feathr.offline.job;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.common.Header;
import com.linkedin.feathr.common.JoiningFeatureParams;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrDataOutputException;
import com.linkedin.feathr.common.exception.FeathrInputDataException;
import com.linkedin.feathr.offline.FeatureDataFrame;
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource;
import com.linkedin.feathr.offline.client.FeathrClient;
import com.linkedin.feathr.offline.client.FeathrClient$;
import com.linkedin.feathr.offline.client.FeathrClient2;
import com.linkedin.feathr.offline.client.FeathrClient2$;
import com.linkedin.feathr.offline.client.InputData;
import com.linkedin.feathr.offline.config.FeatureJoinConfig;
import com.linkedin.feathr.offline.config.FeatureJoinConfig$;
import com.linkedin.feathr.offline.config.datasource.DataSourceConfigUtils$;
import com.linkedin.feathr.offline.config.location.DataLocation;
import com.linkedin.feathr.offline.config.location.DataLocation$;
import com.linkedin.feathr.offline.config.location.SimplePath;
import com.linkedin.feathr.offline.generation.SparkIOUtils$;
import com.linkedin.feathr.offline.source.SourceFormatType$;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper;
import com.linkedin.feathr.offline.util.AclCheckUtils$;
import com.linkedin.feathr.offline.util.CmdLineParser;
import com.linkedin.feathr.offline.util.FeathrUtils$;
import com.linkedin.feathr.offline.util.FeaturizedDatasetMetadata;
import com.linkedin.feathr.offline.util.FeaturizedDatasetMetadata$;
import com.linkedin.feathr.offline.util.HdfsUtils$;
import com.linkedin.feathr.offline.util.OptionParam;
import com.linkedin.feathr.offline.util.SourceUtils$;
import com.linkedin.feathr.offline.util.SparkFeaturizedDataset;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.spark.SparkConf;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.HashMap;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Properties$;
import scala.util.Success;

/* compiled from: FeatureJoinJob.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/job/FeatureJoinJob$.class */
public final class FeatureJoinJob$ {
    public static FeatureJoinJob$ MODULE$;
    private final Logger logger;
    private final String SKIP_OUTPUT;
    private final String SPARK_JOIN_MAX_PARALLELISM;
    private final String SPARK_JOIN_MIN_PARALLELISM;
    private final String SPARK_JOIN_PARALLELISM_DEFAULT;
    private final int SPARK_JOIN_LIMIT_PARTITION_FACTOR;
    private final Logger log;

    static {
        new FeatureJoinJob$();
    }

    public Logger logger() {
        return this.logger;
    }

    public String SKIP_OUTPUT() {
        return this.SKIP_OUTPUT;
    }

    public String SPARK_JOIN_MAX_PARALLELISM() {
        return this.SPARK_JOIN_MAX_PARALLELISM;
    }

    public String SPARK_JOIN_MIN_PARALLELISM() {
        return this.SPARK_JOIN_MIN_PARALLELISM;
    }

    public String SPARK_JOIN_PARALLELISM_DEFAULT() {
        return this.SPARK_JOIN_PARALLELISM_DEFAULT;
    }

    public int SPARK_JOIN_LIMIT_PARTITION_FACTOR() {
        return this.SPARK_JOIN_LIMIT_PARTITION_FACTOR;
    }

    public Logger log() {
        return this.log;
    }

    public void run(SparkSession sparkSession, Configuration configuration, FeathrJoinJobContext feathrJoinJobContext, List<DataPathHandler> list) {
        List<DataLoaderHandler> list2 = (List) list.map(dataPathHandler -> {
            return dataPathHandler.dataLoaderHandler();
        }, List$.MODULE$.canBuildFrom());
        FeatureJoinConfig parseJoinConfig = FeatureJoinConfig$.MODULE$.parseJoinConfig(hdfsFileReader(sparkSession, feathrJoinJobContext.joinConfig()));
        checkAuthorization(sparkSession, configuration, feathrJoinJobContext, list2);
        feathrJoinRun(sparkSession, configuration, parseJoinConfig, feathrJoinJobContext.jobJoinContext(), list, None$.MODULE$, feathrJoinJobContext.useFCM());
    }

    public String stringifyFeatureNames(Set<String> set) {
        return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) ((TraversableOnce) set.toSeq().sorted(Ordering$String$.MODULE$)).toArray(ClassTag$.MODULE$.apply(String.class)))).mkString("\n\t");
    }

    public String hdfsFileReader(SparkSession sparkSession, String str) {
        return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) sparkSession.sparkContext().textFile(str, sparkSession.sparkContext().textFile$default$2()).collect())).mkString("\n");
    }

    private void checkAuthorization(SparkSession sparkSession, Configuration configuration, FeathrJoinJobContext feathrJoinJobContext, List<DataLoaderHandler> list) {
        DataLocation outputPath = feathrJoinJobContext.jobJoinContext().outputPath();
        if (outputPath instanceof SimplePath) {
            Failure checkWriteAuthorization = AclCheckUtils$.MODULE$.checkWriteAuthorization(configuration, ((SimplePath) outputPath).path());
            if (checkWriteAuthorization instanceof Failure) {
                throw new FeathrDataOutputException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(37).append("No write permission for output path ").append(feathrJoinJobContext.jobJoinContext().outputPath()).append(".").toString(), checkWriteAuthorization.exception());
            }
            if (!(checkWriteAuthorization instanceof Success)) {
                throw new MatchError(checkWriteAuthorization);
            }
            log().debug(new StringBuilder(44).append("Checked write authorization on output path: ").append(feathrJoinJobContext.jobJoinContext().outputPath()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        feathrJoinJobContext.jobJoinContext().inputData().map(inputData -> {
            $anonfun$checkAuthorization$1(sparkSession, list, configuration, inputData);
            return BoxedUnit.UNIT;
        });
    }

    public Tuple2<Dataset<Row>, Header> getFeathrClientAndJoinFeatures(SparkSession sparkSession, Dataset<Row> dataset, Map<String, Seq<JoiningFeatureParams>> map, FeatureJoinConfig featureJoinConfig, JoinJobContext joinJobContext, List<DataPathHandler> list, Option<LocalTestConfig> option) {
        return getFeathrClient(sparkSession, joinJobContext, list, option).doJoinObsAndFeatures(featureJoinConfig, joinJobContext, dataset);
    }

    public FeathrClient getFeathrClient(SparkSession sparkSession, JoinJobContext joinJobContext, List<DataPathHandler> list, Option<LocalTestConfig> option) {
        FeathrClient build;
        if (None$.MODULE$.equals(option)) {
            build = FeathrClient$.MODULE$.builder(sparkSession).addFeatureDefPath(joinJobContext.feathrFeatureConfig()).addLocalOverrideDefPath(joinJobContext.feathrLocalConfig()).addDataPathHandlers(list).build();
        } else {
            if (!(option instanceof Some)) {
                throw new MatchError(option);
            }
            LocalTestConfig localTestConfig = (LocalTestConfig) ((Some) option).value();
            build = FeathrClient$.MODULE$.builder(sparkSession).addFeatureDef(localTestConfig.featureConfig()).addLocalOverrideDef(localTestConfig.localConfig()).addDataPathHandlers(list).build();
        }
        return build;
    }

    public Option<LocalTestConfig> getFeathrClientAndJoinFeatures$default$7() {
        return None$.MODULE$;
    }

    public Option<LocalTestConfig> getFeathrClient$default$4() {
        return None$.MODULE$;
    }

    public Dataset<Row> getFCMClientAndJoinFeatures(SparkSession sparkSession, Dataset<Row> dataset, Map<String, Seq<JoiningFeatureParams>> map, FeatureJoinConfig featureJoinConfig, JoinJobContext joinJobContext, List<DataPathHandler> list, Option<LocalTestConfig> option) {
        return ((FeatureDataFrame) getFCMClient(sparkSession, joinJobContext, list, option).joinFeatures(featureJoinConfig, new SparkFeaturizedDataset(dataset, new FeaturizedDatasetMetadata(FeaturizedDatasetMetadata$.MODULE$.apply$default$1(), FeaturizedDatasetMetadata$.MODULE$.apply$default$2())), joinJobContext)._1()).df();
    }

    public FeathrClient2 getFCMClient(SparkSession sparkSession, JoinJobContext joinJobContext, List<DataPathHandler> list, Option<LocalTestConfig> option) {
        FeathrClient2 build;
        if (None$.MODULE$.equals(option)) {
            build = FeathrClient2$.MODULE$.builder(sparkSession).addFeatureDefPath(joinJobContext.feathrFeatureConfig()).addLocalOverrideDefPath(joinJobContext.feathrLocalConfig()).addDataPathHandlers(list).build();
        } else {
            if (!(option instanceof Some)) {
                throw new MatchError(option);
            }
            LocalTestConfig localTestConfig = (LocalTestConfig) ((Some) option).value();
            build = FeathrClient2$.MODULE$.builder(sparkSession).addFeatureDef(localTestConfig.featureConfig()).addLocalOverrideDef(localTestConfig.localConfig()).addDataPathHandlers(list).build();
        }
        return build;
    }

    public Option<LocalTestConfig> getFCMClientAndJoinFeatures$default$7() {
        return None$.MODULE$;
    }

    public Option<LocalTestConfig> getFCMClient$default$4() {
        return None$.MODULE$;
    }

    public Tuple2<Option<RDD<GenericRecord>>, Option<Dataset<Row>>> feathrJoinRun(SparkSession sparkSession, Configuration configuration, FeatureJoinConfig featureJoinConfig, JoinJobContext joinJobContext, List<DataPathHandler> list, Option<LocalTestConfig> option, boolean z) {
        sparkSession.sparkContext().getConf();
        List<DataLoaderHandler> list2 = (List) list.map(dataPathHandler -> {
            return dataPathHandler.dataLoaderHandler();
        }, List$.MODULE$.canBuildFrom());
        Map<String, Seq<JoiningFeatureParams>> featureGroupings = featureJoinConfig.featureGroupings();
        Dataset<Row> loadObservationAsDF = SourceUtils$.MODULE$.loadObservationAsDF(sparkSession, configuration, (InputData) joinJobContext.inputData().get(), list2, new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean());
        Dataset<Row> fCMClientAndJoinFeatures = z ? getFCMClientAndJoinFeatures(sparkSession, loadObservationAsDF, featureGroupings, featureJoinConfig, joinJobContext, list, option) : (Dataset) getFeathrClientAndJoinFeatures(sparkSession, loadObservationAsDF, featureGroupings, featureJoinConfig, joinJobContext, list, option)._1();
        SparkIOUtils$.MODULE$.writeDataFrame(fCMClientAndJoinFeatures, joinJobContext.outputPath(), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(SparkIOUtils$.MODULE$.OUTPUT_PARALLELISM()), Integer.toString(joinJobContext.numParts())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(SparkIOUtils$.MODULE$.OVERWRITE_MODE()), "ALL")})), list2);
        return new Tuple2<>(None$.MODULE$, new Some(fCMClientAndJoinFeatures));
    }

    public Option<LocalTestConfig> feathrJoinRun$default$6() {
        return None$.MODULE$;
    }

    public boolean feathrJoinRun$default$7() {
        return false;
    }

    public FeathrJoinJobContext parseInputArgument(String[] strArr) {
        CmdLineParser cmdLineParser = new CmdLineParser(strArr, Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("feathr-config"), new OptionParam("f", "Path of the feathr local config file", "FCONF", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("feature-config"), new OptionParam("ef", "Names of the feathr feature config files", "EFCONF", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("local-override-all"), new OptionParam("loa", "Local config overrides all other configs", "LOCAL_OVERRIDE", "true")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("join-config"), new OptionParam("j", "Path of the join config file", "JCONF", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("input"), new OptionParam("i", "Path of the input data set", "INPUT", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("output"), new OptionParam("o", "Path of the output", "OUTPUT", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("num-parts"), new OptionParam("n", "Number of output part files", "NPARTS", "-1")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("pass-through-field"), new OptionParam("p", "Pass-through feature field name", "PFIELD", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("pass-through-features"), new OptionParam("t", "Pass-through feature list, comma-separated", "PLIST", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("source-type"), new OptionParam("st", "Source type of the observation data", "SRCTYPE", "FIXED_PATH")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("start-date"), new OptionParam("sd", "Start date of the observation data if it's time based", "SDATE", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("end-date"), new OptionParam("ed", "End date of the observation data if it's time based", "EDATE", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("num-days"), new OptionParam("nd", "Number of days before the offset date if it's time based", "NDAYS", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("date-offset"), new OptionParam("do", "Offset of observation data if it's time based", "DOFFSET", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("join-parallelism"), new OptionParam("p", "Multiplier to increase the number of partitions of feature datasets during joins", "PARALLEL", "8")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("row-bloomfilter-threshold"), new OptionParam("rbt", "Performance tuning, if observation record # is less than the threshold, a bloomfilter will be applied", "ROWFILTERTHRESHOLD", "-1")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("job-version"), new OptionParam("jv", "Job version, integer, job version 2 uses DataFrame and SQL based anchor, default is 2", "JOBVERSION", "2")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("as-tensors"), new OptionParam("at", "If set to true, get features as tensors else as term-vectors", "AS_TENSORS", "false")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("s3-config"), new OptionParam("sc", "Authentication config for S3", "S3_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("adls-config"), new OptionParam("adlc", "Authentication config for ADLS (abfs)", "ADLS_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("blob-config"), new OptionParam("bc", "Authentication config for Azure Blob Storage (wasb)", "BLOB_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sql-config"), new OptionParam("sqlc", "Authentication config for Azure SQL Database (jdbc)", "SQL_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("snowflake-config"), new OptionParam("sfc", "Authentication config for Snowflake Database (jdbc)", "SNOWFLAKE_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("use-fcm"), new OptionParam("ufcm", "If set to true, use FCM client, else use Feathr Client", "USE_FCM", "false")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("system-properties"), new OptionParam("sps", "Additional System Properties", "SYSTEM_PROPERTIES_CONFIG", FeatureValue.EMPTY_TERM))})), new $colon.colon(new org.apache.commons.cli.Option("LOCALMODE", "local-mode", false, "Run in local mode"), Nil$.MODULE$));
        ((HashMap) new ObjectMapper().registerModule(DefaultScalaModule$.MODULE$).readValue((String) cmdLineParser.extractOptionalValue("system-properties").getOrElse(() -> {
            return "{}";
        }), HashMap.class)).foreach(tuple2 -> {
            return Properties$.MODULE$.setProp((String) tuple2._1(), (String) tuple2._2());
        });
        String extractRequiredValue = cmdLineParser.extractRequiredValue("join-config");
        InputData inputData = new InputData(cmdLineParser.extractRequiredValue("input"), SourceFormatType$.MODULE$.withName(cmdLineParser.extractRequiredValue("source-type")), cmdLineParser.extractOptionalValue("start-date"), cmdLineParser.extractOptionalValue("end-date"), cmdLineParser.extractOptionalValue("date-offset"), cmdLineParser.extractOptionalValue("num-days"));
        String extractRequiredValue2 = cmdLineParser.extractRequiredValue("pass-through-features");
        Set empty = FeatureValue.EMPTY_TERM.equals(extractRequiredValue2) ? Predef$.MODULE$.Set().empty() : new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(extractRequiredValue2.split(","))).map(str -> {
            return str.trim();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSet();
        Option<String> extractOptionalValue = cmdLineParser.extractOptionalValue("feathr-config");
        Option<String> extractOptionalValue2 = cmdLineParser.extractOptionalValue("feature-config");
        cmdLineParser.extractRequiredValue("local-override-all");
        return new FeathrJoinJobContext(extractRequiredValue, new JoinJobContext(extractOptionalValue, extractOptionalValue2, new Some(inputData), DataLocation$.MODULE$.apply(cmdLineParser.extractRequiredValue("output")), new StringOps(Predef$.MODULE$.augmentString(cmdLineParser.extractRequiredValue("num-parts"))).toInt()), DataSourceConfigUtils$.MODULE$.getConfigs(cmdLineParser), new StringOps(Predef$.MODULE$.augmentString(cmdLineParser.extractRequiredValue("use-fcm"))).toBoolean());
    }

    public FeatureJoinConfig parseJoinConfig(String str) {
        return FeatureJoinConfig$.MODULE$.parseJoinConfig(str);
    }

    public java.util.Map<String, Dataset<Row>> loadSourceDataframe(String[] strArr, java.util.Set<String> set) {
        logger().info(new StringBuilder(25).append("FeatureJoinJob args are: ").append(strArr).toString());
        logger().info("Feature join job: loadDataframe");
        logger().info(set);
        FeathrJoinPreparationInfo prepareSparkSession = prepareSparkSession(strArr);
        SparkSession sparkSession = prepareSparkSession.sparkSession();
        Configuration hadoopConf = prepareSparkSession.hadoopConf();
        FeathrJoinJobContext jobContext = prepareSparkSession.jobContext();
        checkAuthorization(sparkSession, hadoopConf, jobContext, Nil$.MODULE$);
        Map<String, FeatureAnchorWithSource> allAnchoredFeatures = getFeathrClient(sparkSession, jobContext.jobJoinContext(), Nil$.MODULE$, getFeathrClient$default$4()).allAnchoredFeatures();
        return (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) ((TraversableLike) ((Map) ((TraversableLike) new AnchorToDataSourceMapper(Nil$.MODULE$).getBasicAnchorDFMapForJoin(sparkSession, allAnchoredFeatures.values().toSeq(), new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean()).filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$loadSourceDataframe$1(tuple2));
        })).map(tuple22 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple22._1()), ((Option) tuple22._2()).get());
        }, Map$.MODULE$.canBuildFrom())).filter(tuple23 -> {
            return BoxesRunTime.boxToBoolean($anonfun$loadSourceDataframe$3(set, tuple23));
        })).map(tuple24 -> {
            return new Tuple2(((TraversableOnce) ((FeatureAnchorWithSource) tuple24._1()).featureAnchor().features().toSeq().sorted(Ordering$String$.MODULE$)).mkString(","), ((DataSourceAccessor) tuple24._2()).get());
        }, Map$.MODULE$.canBuildFrom())).asJava();
    }

    public void mainWithPreprocessedDataFrame(String[] strArr, java.util.Map<String, Dataset<Row>> map) {
        PreprocessedDataFrameManager$.MODULE$.preprocessedDfMap_$eq(((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms()));
        main(strArr);
    }

    public void main(String[] strArr) {
        logger().info(new StringBuilder(25).append("FeatureJoinJob args are: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).mkString("Array(", ", ", ")")).toString());
        FeathrJoinPreparationInfo prepareSparkSession = prepareSparkSession(strArr);
        run(prepareSparkSession.sparkSession(), prepareSparkSession.hadoopConf(), prepareSparkSession.jobContext(), Nil$.MODULE$);
    }

    public FeathrJoinPreparationInfo prepareSparkSession(String[] strArr) {
        FeathrJoinJobContext parseInputArgument = parseInputArgument(strArr);
        SparkConf registerKryoClasses = new SparkConf().registerKryoClasses(new Class[]{GenericRecord.class});
        SparkSession orCreate = SparkSession$.MODULE$.builder().config(registerKryoClasses).appName(getClass().getName()).enableHiveSupport().getOrCreate();
        Configuration hadoopConfiguration = orCreate.sparkContext().hadoopConfiguration();
        DataSourceConfigUtils$.MODULE$.setupHadoopConf(orCreate, parseInputArgument.dataSourceConfigs());
        FeathrUdfRegistry$.MODULE$.registerUdf(orCreate);
        DataLocation outputPath = parseInputArgument.jobJoinContext().outputPath();
        Boolean boxToBoolean = outputPath instanceof SimplePath ? BoxesRunTime.boxToBoolean(HdfsUtils$.MODULE$.deletePath(((SimplePath) outputPath).path(), true, hadoopConfiguration)) : BoxedUnit.UNIT;
        if (new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(registerKryoClasses, FeathrUtils$.MODULE$.ENABLE_DEBUG_OUTPUT()))).toBoolean()) {
            Configurator.setAllLevels(LogManager.getRootLogger().getName(), Level.DEBUG);
        }
        return new FeathrJoinPreparationInfo(orCreate, hadoopConfiguration, parseInputArgument);
    }

    public static final /* synthetic */ void $anonfun$checkAuthorization$1(SparkSession sparkSession, List list, Configuration configuration, InputData inputData) {
        boolean z = new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean();
        Seq<String> pathList = SourceUtils$.MODULE$.getPathList(inputData.sourceType(), inputData.inputPath(), sparkSession, inputData.dateParam(), list, None$.MODULE$, z);
        Seq<Tuple2<String, String>> checkReadAuthorization = AclCheckUtils$.MODULE$.checkReadAuthorization(configuration, pathList);
        if (checkReadAuthorization.isEmpty()) {
            MODULE$.log().debug(new StringBuilder(71).append("Checked read authorization on observation data of the following paths:\n").append(pathList.mkString("\n")).toString());
        } else {
            Seq seq = (Seq) checkReadAuthorization.map(tuple2 -> {
                return (String) tuple2._2();
            }, Seq$.MODULE$.canBuildFrom());
            throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(46).append("No read permission on observation data ").append(seq).append(" with  ").append((Seq) checkReadAuthorization.map(tuple22 -> {
                return (String) tuple22._1();
            }, Seq$.MODULE$.canBuildFrom())).toString());
        }
    }

    public static final /* synthetic */ boolean $anonfun$loadSourceDataframe$1(Tuple2 tuple2) {
        return ((Option) tuple2._2()).isDefined();
    }

    public static final /* synthetic */ boolean $anonfun$loadSourceDataframe$3(java.util.Set set, Tuple2 tuple2) {
        return set.contains(((TraversableOnce) ((FeatureAnchorWithSource) tuple2._1()).featureAnchor().features().toSeq().sorted(Ordering$String$.MODULE$)).mkString(","));
    }

    private FeatureJoinJob$() {
        MODULE$ = this;
        this.logger = LogManager.getLogger(getClass());
        this.SKIP_OUTPUT = "skip_output";
        this.SPARK_JOIN_MAX_PARALLELISM = "10000";
        this.SPARK_JOIN_MIN_PARALLELISM = "10";
        this.SPARK_JOIN_PARALLELISM_DEFAULT = "5000";
        this.SPARK_JOIN_LIMIT_PARTITION_FACTOR = 2;
        this.log = LogManager.getLogger(getClass());
    }
}
