package org.apache.wayang.flink.platform;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.plugin.WayangBasic;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileToTimeConverter;
import org.apache.wayang.core.optimizer.costs.LoadToTimeConverter;
import org.apache.wayang.core.optimizer.costs.TimeToCostConverter;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.flink.execution.FlinkContextReference;
import org.apache.wayang.flink.execution.FlinkExecutor;

/* loaded from: input_file:org/apache/wayang/flink/platform/FlinkPlatform.class */
public class FlinkPlatform extends Platform {
    private static final String PLATFORM_NAME = "Apache Flink";
    private static final String CONFIG_NAME = "flink";
    private static final String DEFAULT_CONFIG_FILE = "wayang-flink-defaults.properties";
    public static final String INITIALIZATION_MS_CONFIG_KEY = "wayang.flink.init.ms";
    private static FlinkPlatform instance = null;
    private static final String[] REQUIRED_FLINK_PROPERTIES = new String[0];
    private static final String[] OPTIONAL_FLINK_PROPERTIES = new String[0];
    private FlinkContextReference flinkContextReference;
    private Logger logger;

    public static FlinkPlatform getInstance() {
        if (instance == null) {
            instance = new FlinkPlatform();
        }
        return instance;
    }

    private FlinkPlatform() {
        super(PLATFORM_NAME, CONFIG_NAME);
        this.flinkContextReference = null;
        this.logger = LogManager.getLogger(getClass());
    }

    public FlinkContextReference getFlinkContext(Job job) {
        Configuration configuration = job.getConfiguration();
        if (this.flinkContextReference == null) {
            String stringProperty = configuration.getStringProperty("wayang.flink.mode.run");
            boolean z = -1;
            switch (stringProperty.hashCode()) {
                case -1741312354:
                    if (stringProperty.equals("collection")) {
                        z = 2;
                        break;
                    }
                    break;
                case -1580708220:
                    if (stringProperty.equals("distribution")) {
                        z = true;
                        break;
                    }
                    break;
                case 103145323:
                    if (stringProperty.equals("local")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    this.flinkContextReference = new FlinkContextReference(job.getCrossPlatformExecutor(), ExecutionEnvironment.createLocalEnvironment(), (int) configuration.getLongProperty("wayang.flink.paralelism"));
                    break;
                case true:
                    this.flinkContextReference = new FlinkContextReference(job.getCrossPlatformExecutor(), ExecutionEnvironment.createRemoteEnvironment(configuration.getStringProperty("wayang.flink.master"), Integer.parseInt(configuration.getStringProperty("wayang.flink.port")), getJars(job)), (int) configuration.getLongProperty("wayang.flink.paralelism"));
                    break;
                case true:
                default:
                    this.flinkContextReference = new FlinkContextReference(job.getCrossPlatformExecutor(), new CollectionEnvironment(), 1);
                    break;
            }
        }
        return this.flinkContextReference;
    }

    public void configureDefaults(Configuration configuration) {
        configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
    }

    public Executor.Factory getExecutorFactory() {
        return job -> {
            return new FlinkExecutor(this, job);
        };
    }

    public LoadProfileToTimeConverter createLoadProfileToTimeConverter(Configuration configuration) {
        int longProperty = (int) configuration.getLongProperty("wayang.flink.cpu.mhz");
        int longProperty2 = (int) configuration.getLongProperty("wayang.flink.paralelism");
        return LoadProfileToTimeConverter.createTopLevelStretching(LoadToTimeConverter.createLinearCoverter(1.0d / ((longProperty2 * longProperty) * 1000.0d)), LoadToTimeConverter.createLinearCoverter(configuration.getDoubleProperty("wayang.flink.hdfs.ms-per-mb") / 1000000.0d), LoadToTimeConverter.createLinearCoverter(configuration.getDoubleProperty("wayang.flink.network.ms-per-mb") / 1000000.0d), (timeEstimate, timeEstimate2, timeEstimate3) -> {
            return timeEstimate.plus(timeEstimate2).plus(timeEstimate3);
        }, configuration.getDoubleProperty("wayang.flink.stretch"));
    }

    public TimeToCostConverter createTimeToCostConverter(Configuration configuration) {
        return new TimeToCostConverter(configuration.getDoubleProperty("wayang.flink.costs.fix"), configuration.getDoubleProperty("wayang.flink.costs.per-ms"));
    }

    private String[] getJars(Job job) {
        ArrayList arrayList = new ArrayList(5);
        Stream filter = Arrays.asList(FlinkPlatform.class, WayangBasic.class, WayangContext.class).stream().map(ReflectionUtils::getDeclaringJar).filter(str -> {
            return str != null;
        });
        arrayList.getClass();
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        Set udfJarPaths = job.getUdfJarPaths();
        if (udfJarPaths.isEmpty()) {
            this.logger.warn("Non-local FlinkContext but not UDF JARs have been declared.");
        } else {
            Stream filter2 = udfJarPaths.stream().filter(str2 -> {
                return str2 != null;
            });
            arrayList.getClass();
            filter2.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return (String[]) arrayList.toArray(new String[0]);
    }
}
