package org.apache.seatunnel.core.flink.command;

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.Execution;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkExecutionContext;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;

/* loaded from: input_file:org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.class */
public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
    private final FlinkCommandArgs flinkCommandArgs;

    public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
        this.flinkCommandArgs = flinkCommandArgs;
    }

    @Override // org.apache.seatunnel.core.base.command.Command
    public void execute() throws CommandExecuteException {
        FlinkExecutionContext flinkExecutionContext = new FlinkExecutionContext(new ConfigBuilder(FileUtils.getConfigPath(this.flinkCommandArgs)).getConfig(), this.flinkCommandArgs.getEngineType());
        List<BaseSource<FlinkEnvironment>> sources = flinkExecutionContext.getSources();
        List<BaseTransform<FlinkEnvironment>> transforms = flinkExecutionContext.getTransforms();
        List<BaseSink<FlinkEnvironment>> sinks = flinkExecutionContext.getSinks();
        checkPluginType(flinkExecutionContext.getJobMode(), sources, transforms, sinks);
        baseCheckConfig(sinks, transforms, sinks);
        showAsciiLogo();
        try {
            Execution createExecution = new ExecutionFactory(flinkExecutionContext).createExecution();
            Throwable th = null;
            try {
                try {
                    prepare(flinkExecutionContext.getEnvironment(), sources, transforms, sinks);
                    createExecution.start(sources, transforms, sinks);
                    close(sources, transforms, sinks);
                    if (createExecution != null) {
                        if (0 != 0) {
                            try {
                                createExecution.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createExecution.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new CommandExecuteException("Execute Flink task error", e);
        }
    }

    @VisibleForTesting
    void checkPluginType(JobMode jobMode, List<? extends Plugin<FlinkEnvironment>>... listArr) {
        Stream flatMap = Arrays.stream(listArr).flatMap((v0) -> {
            return v0.stream();
        });
        switch (jobMode) {
            case STREAMING:
                flatMap.forEach(plugin -> {
                    if (!((plugin instanceof FlinkStreamSource) || (plugin instanceof FlinkStreamTransform) || (plugin instanceof FlinkStreamSink))) {
                        throw new IllegalArgumentException(String.format("Cannot use batch plugin: %s in stream mode", plugin.getPluginName()));
                    }
                });
                return;
            case BATCH:
                flatMap.forEach(plugin2 -> {
                    if (!((plugin2 instanceof FlinkBatchSource) || (plugin2 instanceof FlinkBatchTransform) || (plugin2 instanceof FlinkBatchSink))) {
                        throw new IllegalArgumentException(String.format("Cannot use stream plugin: %s in batch mode", plugin2.getPluginName()));
                    }
                });
                return;
            default:
                throw new IllegalArgumentException("Unsupported job mode: " + jobMode);
        }
    }
}
