package org.apache.seatunnel.core.base.config;

import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchExecution;
import org.apache.seatunnel.flink.stream.FlinkStreamExecution;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.spark.batch.SparkBatchExecution;
import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/base/config/ExecutionFactory.class */
public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ExecutionFactory.class);
    public AbstractExecutionContext<ENVIRONMENT> executionContext;

    public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> abstractExecutionContext) {
        this.executionContext = abstractExecutionContext;
    }

    public Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT> createExecution() {
        Execution flinkBatchExecution;
        switch (this.executionContext.getEngine()) {
            case SPARK:
                SparkEnvironment sparkEnvironment = (SparkEnvironment) this.executionContext.getEnvironment();
                switch (this.executionContext.getJobMode()) {
                    case STREAMING:
                        flinkBatchExecution = new SparkStreamingExecution(sparkEnvironment);
                        break;
                    case STRUCTURED_STREAMING:
                        flinkBatchExecution = new StructuredStreamingExecution(sparkEnvironment);
                        break;
                    default:
                        flinkBatchExecution = new SparkBatchExecution(sparkEnvironment);
                        break;
                }
            case FLINK:
                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) this.executionContext.getEnvironment();
                switch (this.executionContext.getJobMode()) {
                    case STREAMING:
                        flinkBatchExecution = new FlinkStreamExecution(flinkEnvironment);
                        break;
                    default:
                        flinkBatchExecution = new FlinkBatchExecution(flinkEnvironment);
                        break;
                }
            default:
                throw new IllegalArgumentException("No suitable engine");
        }
        log.info("current execution is [{}]", flinkBatchExecution.getClass().getName());
        return flinkBatchExecution;
    }
}
