package org.apache.seatunnel.core.starter.flink.execution;

import com.google.common.collect.Lists;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.source.FlinkSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.class */
public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
    private static final Logger log = LoggerFactory.getLogger(SourceExecuteProcessor.class);
    private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();

    public SourceExecuteProcessor(List<URL> list, Config config, List<? extends Config> list2, JobContext jobContext) {
        super(list, config, list2, jobContext);
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> list) {
        StreamExecutionEnvironment streamExecutionEnvironment = this.flinkRuntimeEnvironment.getStreamExecutionEnvironment();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            SourceTableInfo sourceTableInfo = (SourceTableInfo) this.plugins.get(i);
            SeaTunnelSource source = sourceTableInfo.getSource();
            Config config = this.pluginConfigs.get(i);
            DataStreamSource fromSource = streamExecutionEnvironment.fromSource(new FlinkSource(source, this.envConfig), WatermarkStrategy.noWatermarks(), String.format("%s-Source", source.getPluginName()));
            if (config.hasPath(CommonOptions.PARALLELISM.key())) {
                fromSource.setParallelism(config.getInt(CommonOptions.PARALLELISM.key()));
            }
            arrayList.add(new DataStreamTableInfo(fromSource, sourceTableInfo.getCatalogTables(), config.hasPath(CommonOptions.RESULT_TABLE_NAME.key()) ? config.getString(CommonOptions.RESULT_TABLE_NAME.key()) : null));
        }
        return arrayList;
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor
    protected List<SourceTableInfo> initializePlugins(List<URL> list, List<? extends Config> list2) {
        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
        SeaTunnelFactoryDiscovery seaTunnelFactoryDiscovery = new SeaTunnelFactoryDiscovery(TableSourceFactory.class, ADD_URL_TO_CLASSLOADER);
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (Config config : list2) {
            PluginIdentifier of = PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, PLUGIN_TYPE, config.getString(CommonOptions.PLUGIN_NAME.key()));
            hashSet.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            arrayList.add(PluginUtil.createSource(seaTunnelFactoryDiscovery, seaTunnelSourcePluginDiscovery, of, config, this.jobContext));
        }
        list.addAll(hashSet);
        return arrayList;
    }
}
