package org.apache.wayang.spark.platform;

import java.util.Collections;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.wayang.basic.plugin.WayangBasic;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileToTimeConverter;
import org.apache.wayang.core.optimizer.costs.LoadToTimeConverter;
import org.apache.wayang.core.optimizer.costs.TimeToCostConverter;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Formats;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.spark.execution.SparkContextReference;
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.operators.SparkCollectionSource;
import org.apache.wayang.spark.operators.SparkLocalCallbackSink;

/* loaded from: input_file:org/apache/wayang/spark/platform/SparkPlatform.class */
public class SparkPlatform extends Platform {
    private static final String PLATFORM_NAME = "Apache Spark";
    private static final String CONFIG_NAME = "spark";
    private static final String DEFAULT_CONFIG_FILE = "wayang-spark-defaults.properties";
    public static final String INITIALIZATION_MS_CONFIG_KEY = "wayang.spark.init.ms";
    private static SparkPlatform instance = null;
    private static final String[] REQUIRED_SPARK_PROPERTIES = {"spark.master"};
    private static final String[] OPTIONAL_SPARK_PROPERTIES = {"spark.app.name", "spark.executor.memory", "spark.executor.cores", "spark.executor.instances", "spark.dynamicAllocation.enabled", "spark.executor.extraJavaOptions", "spark.eventLog.enabled", "spark.eventLog.dir", "spark.serializer", "spark.kryo.classesToRegister", "spark.kryo.registrator", "spark.local.dir", "spark.logConf", "spark.driver.host", "spark.driver.port", "spark.driver.maxResultSize", "spark.ui.showConsoleProgress", "spark.io.compression.codec", "spark.driver.memory", "spark.executor.heartbeatInterval", "spark.network.timeout"};
    private static final String[] OPTIONAL_HADOOP_PROPERTIES = {"fs.s3.awsAccessKeyId", "fs.s3.awsSecretAccessKey"};
    private SparkContextReference sparkContextReference;
    private Logger logger;

    public static SparkPlatform getInstance() {
        if (instance == null) {
            instance = new SparkPlatform();
        }
        return instance;
    }

    private SparkPlatform() {
        super(PLATFORM_NAME, CONFIG_NAME);
        this.logger = LogManager.getLogger(getClass());
    }

    public SparkContextReference getSparkContext(Job job) {
        SparkConf sparkConf;
        Configuration configuration = job.getConfiguration();
        if (this.sparkContextReference == null || this.sparkContextReference.isDisposed()) {
            sparkConf = new SparkConf(true);
        } else {
            JavaSparkContext javaSparkContext = this.sparkContextReference.get();
            this.logger.warn("There is already a SparkContext (master: {}): , which will be reused. Not all settings might be effective.", javaSparkContext.getConf().get("spark.master"));
            sparkConf = javaSparkContext.getConf();
        }
        for (String str : REQUIRED_SPARK_PROPERTIES) {
            sparkConf.set(str, configuration.getStringProperty(str));
        }
        for (String str2 : OPTIONAL_SPARK_PROPERTIES) {
            SparkConf sparkConf2 = sparkConf;
            configuration.getOptionalStringProperty(str2).ifPresent(str3 -> {
                sparkConf2.set(str2, str3);
            });
        }
        if (job.getName() != null) {
            sparkConf.set("spark.app.name", job.getName());
        }
        if (this.sparkContextReference == null || this.sparkContextReference.isDisposed()) {
            this.sparkContextReference = new SparkContextReference(job.getCrossPlatformExecutor(), new JavaSparkContext(sparkConf));
        }
        JavaSparkContext javaSparkContext2 = this.sparkContextReference.get();
        org.apache.hadoop.conf.Configuration hadoopConfiguration = javaSparkContext2.hadoopConfiguration();
        for (String str4 : OPTIONAL_HADOOP_PROPERTIES) {
            System.out.println(str4);
            configuration.getOptionalStringProperty(str4).ifPresent(str5 -> {
                hadoopConfiguration.set(str4, str5);
            });
        }
        if (!javaSparkContext2.isLocal().booleanValue()) {
            registerJarIfNotNull(ReflectionUtils.getDeclaringJar(SparkPlatform.class));
            registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangBasic.class));
            registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangContext.class));
            Set udfJarPaths = job.getUdfJarPaths();
            if (udfJarPaths.isEmpty()) {
                this.logger.warn("Non-local SparkContext but not UDF JARs have been declared.");
            } else {
                udfJarPaths.forEach(this::registerJarIfNotNull);
            }
        }
        return this.sparkContextReference;
    }

    private void registerJarIfNotNull(String str) {
        if (str != null) {
            this.sparkContextReference.get().addJar(str);
        }
    }

    public void configureDefaults(Configuration configuration) {
        configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
    }

    public LoadProfileToTimeConverter createLoadProfileToTimeConverter(Configuration configuration) {
        int longProperty = (int) configuration.getLongProperty("wayang.spark.cpu.mhz");
        int longProperty2 = (int) (((int) configuration.getLongProperty("wayang.spark.machines")) * configuration.getLongProperty("wayang.spark.cores-per-machine"));
        return LoadProfileToTimeConverter.createTopLevelStretching(LoadToTimeConverter.createLinearCoverter(1.0d / ((longProperty2 * longProperty) * 1000.0d)), LoadToTimeConverter.createLinearCoverter(configuration.getDoubleProperty("wayang.spark.hdfs.ms-per-mb") / 1000000.0d), LoadToTimeConverter.createLinearCoverter(configuration.getDoubleProperty("wayang.spark.network.ms-per-mb") / 1000000.0d), (timeEstimate, timeEstimate2, timeEstimate3) -> {
            return timeEstimate.plus(timeEstimate2).plus(timeEstimate3);
        }, configuration.getDoubleProperty("wayang.spark.stretch"));
    }

    public TimeToCostConverter createTimeToCostConverter(Configuration configuration) {
        return new TimeToCostConverter(configuration.getDoubleProperty("wayang.spark.costs.fix"), configuration.getDoubleProperty("wayang.spark.costs.per-ms"));
    }

    public Executor.Factory getExecutorFactory() {
        return job -> {
            return new SparkExecutor(this, job);
        };
    }

    public void warmUp(Configuration configuration) {
        super.warmUp(configuration);
        this.logger.info("Running warm-up Spark job...");
        long currentTimeMillis = System.currentTimeMillis();
        WayangContext wayangContext = new WayangContext(configuration);
        SparkCollectionSource sparkCollectionSource = new SparkCollectionSource(Collections.singleton(0), DataSetType.createDefault(Integer.class));
        Operator sparkLocalCallbackSink = new SparkLocalCallbackSink(num -> {
        }, DataSetType.createDefault(Integer.class));
        sparkCollectionSource.connectTo(0, sparkLocalCallbackSink, 0);
        Job createJob = wayangContext.createJob("Warm up", new WayangPlan(new Operator[]{sparkLocalCallbackSink}), new String[0]);
        createJob.getConfiguration().setProperty("wayang.core.log.enabled", "false");
        createJob.execute();
        this.logger.info("Spark warm-up finished in {}.", Formats.formatDuration(System.currentTimeMillis() - currentTimeMillis, true));
    }

    public long getInitializeMillis(Configuration configuration) {
        return configuration.getLongProperty(INITIALIZATION_MS_CONFIG_KEY);
    }
}
