package org.apache.seatunnel.core.spark.command;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.spark.batch.SparkBatchSink;
import org.apache.seatunnel.spark.batch.SparkBatchSource;
import org.apache.seatunnel.spark.stream.SparkStreamingSink;
import org.apache.seatunnel.spark.stream.SparkStreamingSource;
import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSink;
import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSource;

/* loaded from: input_file:org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.class */
public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
    private final SparkCommandArgs sparkCommandArgs;

    public SparkTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
        this.sparkCommandArgs = sparkCommandArgs;
    }

    @Override // org.apache.seatunnel.core.base.command.Command
    public void execute() {
        EngineType engineType = this.sparkCommandArgs.getEngineType();
        ExecutionContext executionContext = new ExecutionContext(new ConfigBuilder(FileUtils.getConfigPath(this.sparkCommandArgs), engineType).getConfig(), engineType);
        List sources = executionContext.getSources();
        List transforms = executionContext.getTransforms();
        List sinks = executionContext.getSinks();
        baseCheckConfig(sources, transforms, sinks);
        showAsciiLogo();
        try {
            Execution createExecution = new ExecutionFactory(executionContext).createExecution();
            Throwable th = null;
            try {
                try {
                    prepare(executionContext.getEnvironment(), sources, transforms, sinks);
                    createExecution.start(sources, transforms, sinks);
                    close(sources, transforms, sinks);
                    if (createExecution != null) {
                        if (0 != 0) {
                            try {
                                createExecution.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createExecution.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Execute Spark task error", e);
        }
    }

    private void checkPluginType(JobMode jobMode, List<? extends Plugin<?>>... listArr) {
        Stream flatMap = Arrays.stream(listArr).flatMap((v0) -> {
            return v0.stream();
        });
        switch (jobMode) {
            case STREAMING:
                flatMap.forEach(plugin -> {
                    if (!((plugin instanceof SparkStreamingSource) || (plugin instanceof SparkStreamingSink))) {
                        throw new IllegalArgumentException(String.format("Current execute mode is Streaming, but %s is not Streaming plugin", plugin.getPluginName()));
                    }
                });
                return;
            case BATCH:
                flatMap.forEach(plugin2 -> {
                    if (!((plugin2 instanceof SparkBatchSource) || (plugin2 instanceof SparkBatchSink))) {
                        throw new IllegalArgumentException(String.format("Current execute mode is Batch, but %s is not Batch plugin", plugin2.getPluginName()));
                    }
                });
                return;
            case STRUCTURED_STREAMING:
                flatMap.forEach(plugin3 -> {
                    if (!((plugin3 instanceof StructuredStreamingSource) || (plugin3 instanceof StructuredStreamingSink))) {
                        throw new IllegalArgumentException(String.format("Current execute mode is StructuredStreaming, but %s is not StructuredStreaming plugin", plugin3.getPluginName()));
                    }
                });
                return;
            default:
                throw new IllegalArgumentException("Unsupported job mode: " + jobMode);
        }
    }
}
