package org.apache.seatunnel.config;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchExecution;
import org.apache.seatunnel.flink.stream.FlinkStreamExecution;
import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.spark.batch.SparkBatchExecution;
import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
import org.apache.seatunnel.utils.Engine;
import org.apache.seatunnel.utils.PluginType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/config/ConfigBuilder.class */
public class ConfigBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ConfigBuilder.class);
    private static final String PLUGIN_NAME_KEY = "plugin_name";
    private final String configFile;
    private final Engine engine;
    private ConfigPackage configPackage;
    private boolean streaming;
    private Config envConfig;
    private final Config config = load();
    private final RuntimeEnv env = createEnv();

    public ConfigBuilder(String str, Engine engine) {
        this.configFile = str;
        this.engine = engine;
        this.configPackage = new ConfigPackage(engine.getEngine());
    }

    private Config load() {
        if (this.configFile.isEmpty()) {
            throw new ConfigRuntimeException("Please specify config file");
        }
        LOGGER.info("Loading config file: {}", this.configFile);
        Config resolveWith = ConfigFactory.parseFile(new File(this.configFile)).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)).resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
        LOGGER.info("parsed config file: {}", resolveWith.root().render(ConfigRenderOptions.concise().setFormatted(true)));
        return resolveWith;
    }

    public Config getEnvConfigs() {
        return this.envConfig;
    }

    public RuntimeEnv getEnv() {
        return this.env;
    }

    private boolean checkIsStreaming() {
        return this.config.getConfigList(PluginType.SOURCE.getType()).get(0).getString(PLUGIN_NAME_KEY).toLowerCase().endsWith("stream");
    }

    private <T extends Plugin<?>> T createPluginInstanceIgnoreCase(String str, PluginType pluginType) throws Exception {
        String sinkPackage;
        ServiceLoader load;
        T t;
        if (str.split("\\.").length != 1) {
            return (T) Class.forName(str).newInstance();
        }
        switch (pluginType) {
            case SOURCE:
                sinkPackage = this.configPackage.getSourcePackage();
                load = ServiceLoader.load(Class.forName(this.configPackage.getBaseSourceClass()));
                break;
            case TRANSFORM:
                sinkPackage = this.configPackage.getTransformPackage();
                load = ServiceLoader.load(Class.forName(this.configPackage.getBaseTransformClass()));
                break;
            case SINK:
                sinkPackage = this.configPackage.getSinkPackage();
                load = ServiceLoader.load(Class.forName(this.configPackage.getBaseSinkClass()));
                break;
            default:
                throw new IllegalArgumentException("PluginType not support : [" + pluginType + "]");
        }
        String str2 = sinkPackage + "." + str;
        Iterator it = load.iterator();
        while (it.hasNext()) {
            try {
                t = (T) it.next();
            } catch (ServiceConfigurationError e) {
                LOGGER.warn("Error when load plugin: [{}]", str2, e);
            }
            if (t.getClass().getName().toLowerCase().equals(str2.toLowerCase())) {
                return t;
            }
        }
        throw new ClassNotFoundException("Plugin class not found by name :[" + str2 + "]");
    }

    public void checkConfig() {
        createEnv();
        createPlugins(PluginType.SOURCE);
        createPlugins(PluginType.TRANSFORM);
        createPlugins(PluginType.SINK);
    }

    public <T extends Plugin<?>> List<T> createPlugins(PluginType pluginType) {
        Objects.requireNonNull(pluginType, "PluginType can not be null when create plugins!");
        ArrayList arrayList = new ArrayList();
        this.config.getConfigList(pluginType.getType()).forEach(config -> {
            try {
                Plugin createPluginInstanceIgnoreCase = createPluginInstanceIgnoreCase(config.getString(PLUGIN_NAME_KEY), pluginType);
                createPluginInstanceIgnoreCase.setConfig(config);
                arrayList.add(createPluginInstanceIgnoreCase);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return arrayList;
    }

    private RuntimeEnv createEnv() {
        RuntimeEnv flinkEnvironment;
        this.envConfig = this.config.getConfig("env");
        this.streaming = checkIsStreaming();
        switch (this.engine) {
            case SPARK:
                flinkEnvironment = new SparkEnvironment();
                break;
            case FLINK:
                flinkEnvironment = new FlinkEnvironment();
                break;
            default:
                throw new IllegalArgumentException("Engine: " + this.engine + " is not supported");
        }
        flinkEnvironment.setConfig(this.envConfig);
        flinkEnvironment.prepare(Boolean.valueOf(this.streaming));
        return flinkEnvironment;
    }

    public Execution createExecution() {
        Execution execution = null;
        switch (this.engine) {
            case SPARK:
                SparkEnvironment sparkEnvironment = (SparkEnvironment) this.env;
                if (!this.streaming) {
                    execution = new SparkBatchExecution(sparkEnvironment);
                    break;
                } else {
                    execution = new SparkStreamingExecution(sparkEnvironment);
                    break;
                }
            case FLINK:
                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) this.env;
                if (!this.streaming) {
                    execution = new FlinkBatchExecution(flinkEnvironment);
                    break;
                } else {
                    execution = new FlinkStreamExecution(flinkEnvironment);
                    break;
                }
        }
        return execution;
    }
}
