package org.apache.seatunnel.spark;

import java.net.URL;
import java.util.List;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/spark/SparkEnvironment.class */
public class SparkEnvironment implements RuntimeEnv {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SparkEnvironment.class);
    private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
    private SparkConf sparkConf;
    private SparkSession sparkSession;
    private StreamingContext streamingContext;
    private Config config = ConfigFactory.empty();
    private boolean enableHive = false;
    private JobMode jobMode;

    public SparkEnvironment setEnableHive(boolean z) {
        this.enableHive = z;
        return this;
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public SparkEnvironment setConfig(Config config) {
        this.config = config;
        return this;
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public RuntimeEnv setJobMode(JobMode jobMode) {
        this.jobMode = jobMode;
        return this;
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public JobMode getJobMode() {
        return this.jobMode;
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public CheckResult checkConfig() {
        return CheckResult.success();
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public void registerPlugin(List<URL> list) {
        log.info("register plugins :" + list);
    }

    @Override // org.apache.seatunnel.apis.base.env.RuntimeEnv
    public SparkEnvironment prepare() {
        this.sparkConf = createSparkConf();
        SparkSession.Builder config = SparkSession.builder().config(this.sparkConf);
        if (this.enableHive) {
            config.enableHiveSupport();
        }
        this.sparkSession = config.getOrCreate();
        createStreamingContext();
        return this;
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    private SparkConf createSparkConf() {
        SparkConf sparkConf = new SparkConf();
        this.config.entrySet().forEach(entry -> {
            sparkConf.set((String) entry.getKey(), String.valueOf(((ConfigValue) entry.getValue()).unwrapped()));
        });
        if (this.config.hasPath("job.name")) {
            sparkConf.setAppName(this.config.getString("job.name"));
        }
        return sparkConf;
    }

    private void createStreamingContext() {
        long j = this.sparkSession.sparkContext().getConf().getLong("spark.stream.batchDuration", 5L);
        if (this.streamingContext == null) {
            this.streamingContext = new StreamingContext(this.sparkSession.sparkContext(), Seconds.apply(j));
        }
    }

    public static void registerTempView(String str, Dataset<Row> dataset) {
        dataset.createOrReplaceTempView(str);
    }

    public static Dataset<Row> registerInputTempView(BaseSparkSource<Dataset<Row>> baseSparkSource, SparkEnvironment sparkEnvironment) {
        Config config = baseSparkSource.getConfig();
        if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            throw new ConfigRuntimeException("Plugin[" + baseSparkSource.getClass().getName() + "] must be registered as dataset/table, please set \"" + Plugin.RESULT_TABLE_NAME + "\" config");
        }
        String string = config.getString(Plugin.RESULT_TABLE_NAME);
        Dataset<Row> data = baseSparkSource.getData(sparkEnvironment);
        registerTempView(string, data);
        return data;
    }

    public static Dataset<Row> transformProcess(SparkEnvironment sparkEnvironment, BaseSparkTransform baseSparkTransform, Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Config config = baseSparkTransform.getConfig();
        if (config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            dataset2 = sparkEnvironment.getSparkSession().read().table(config.getString(Plugin.SOURCE_TABLE_NAME));
        } else {
            dataset2 = dataset;
        }
        return baseSparkTransform.process(dataset2, sparkEnvironment);
    }

    public static void registerTransformTempView(BaseSparkTransform baseSparkTransform, Dataset<Row> dataset) {
        Config config = baseSparkTransform.getConfig();
        if (config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            registerTempView(config.getString(Plugin.RESULT_TABLE_NAME), dataset);
        }
    }

    public static <T> T sinkProcess(SparkEnvironment sparkEnvironment, BaseSparkSink<T> baseSparkSink, Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Config config = baseSparkSink.getConfig();
        if (config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            dataset2 = sparkEnvironment.getSparkSession().read().table(config.getString(Plugin.SOURCE_TABLE_NAME));
        } else {
            dataset2 = dataset;
        }
        return baseSparkSink.output(dataset2, sparkEnvironment);
    }
}
