package org.apache.seatunnel.core.starter.flink.execution;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
import org.apache.seatunnel.core.starter.flink.FlinkStarter;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.class */
public class FlinkExecution implements TaskExecution {
    private static final Logger log = LoggerFactory.getLogger(FlinkExecution.class);
    private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> sourcePluginExecuteProcessor;
    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> transformPluginExecuteProcessor;
    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> sinkPluginExecuteProcessor;
    private final List<URL> jarPaths;

    public FlinkExecution(Config config) {
        try {
            this.jarPaths = new ArrayList(Collections.singletonList(new File(Common.appStarterDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
            registerPlugin(config.getConfig(Constants.ENV));
            JobContext jobContext = new JobContext();
            jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
            this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(this.jarPaths, config.getConfigList("source"), jobContext);
            this.transformPluginExecuteProcessor = new TransformExecuteProcessor(this.jarPaths, TypesafeConfigUtils.getConfigList(config, "transform", Collections.emptyList()), jobContext);
            this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(this.jarPaths, config.getConfigList("sink"), jobContext);
            this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance(registerPlugin(config, this.jarPaths));
            this.sourcePluginExecuteProcessor.setRuntimeEnvironment(this.flinkRuntimeEnvironment);
            this.transformPluginExecuteProcessor.setRuntimeEnvironment(this.flinkRuntimeEnvironment);
            this.sinkPluginExecuteProcessor.setRuntimeEnvironment(this.flinkRuntimeEnvironment);
        } catch (MalformedURLException e) {
            throw new SeaTunnelException("load flink starter error.", e);
        }
    }

    @Override // org.apache.seatunnel.core.starter.execution.TaskExecution
    public void execute() throws TaskExecuteException {
        this.sinkPluginExecuteProcessor.execute(this.transformPluginExecuteProcessor.execute(this.sourcePluginExecuteProcessor.execute(new ArrayList())));
        log.info("Flink Execution Plan: {}", this.flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
        log.info("Flink job name: {}", this.flinkRuntimeEnvironment.getJobName());
        try {
            this.flinkRuntimeEnvironment.getStreamExecutionEnvironment().execute(this.flinkRuntimeEnvironment.getJobName());
        } catch (Exception e) {
            throw new TaskExecuteException("Execute Flink job error", e);
        }
    }

    private void registerPlugin(Config config) {
        ArrayList arrayList = new ArrayList();
        if (config.hasPath(EnvCommonOptions.JARS.key())) {
            arrayList = new ArrayList(Common.getThirdPartyJars(config.getString(EnvCommonOptions.JARS.key())));
        }
        arrayList.addAll(Common.getPluginsJarDependencies());
        List list = (List) Stream.concat(arrayList.stream(), Common.getLibJars().stream()).map((v0) -> {
            return v0.toUri();
        }).map(uri -> {
            try {
                return uri.toURL();
            } catch (MalformedURLException e) {
                throw new RuntimeException("the uri of jar illegal:" + uri, e);
            }
        }).collect(Collectors.toList());
        list.forEach(url -> {
            FlinkAbstractPluginExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
        });
        this.jarPaths.addAll(list);
    }

    private Config registerPlugin(Config config, List<URL> list) {
        return injectJarsToConfig(injectJarsToConfig(config, ConfigUtil.joinPath(Constants.ENV, "pipeline", "jars"), list), ConfigUtil.joinPath(Constants.ENV, "pipeline", "classpaths"), list);
    }

    private Config injectJarsToConfig(Config config, String str, List<URL> list) {
        Config withValue;
        ArrayList arrayList = new ArrayList();
        for (URL url : list) {
            if (new File(url.getFile()).exists()) {
                arrayList.add(url);
                log.info("Inject jar to config: {}", url);
            } else {
                log.warn("Remove invalid jar when inject jars into config: {}", url);
            }
        }
        if (config.hasPath(str)) {
            Set set = (Set) Arrays.stream(config.getString(str).split(";")).map(str2 -> {
                try {
                    return new URL(str2);
                } catch (MalformedURLException e) {
                    throw new RuntimeException("the uri of jar illegal:" + str2, e);
                }
            }).collect(Collectors.toSet());
            set.addAll(arrayList);
            withValue = config.withValue(str, ConfigValueFactory.fromAnyRef(set.stream().map((v0) -> {
                return v0.toString();
            }).distinct().collect(Collectors.joining(";"))));
        } else {
            withValue = config.withValue(str, ConfigValueFactory.fromAnyRef(arrayList.stream().map((v0) -> {
                return v0.toString();
            }).distinct().collect(Collectors.joining(";"))));
        }
        return withValue;
    }
}
