package org.apache.seatunnel.core.starter.spark.execution;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.class */
public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
    private static final String PLUGIN_TYPE = "source";

    public SourceExecuteProcessor(SparkEnvironment sparkEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(sparkEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.PluginExecuteProcessor
    public List<Dataset<Row>> execute(List<Dataset<Row>> list) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            SeaTunnelSource seaTunnelSource = (SeaTunnelSource) this.plugins.get(i);
            Dataset<Row> load = this.sparkEnvironment.getSparkSession().read().format(SeaTunnelSource.class.getSimpleName()).option(CollectionConstants.PARALLELISM, this.pluginConfigs.get(i).hasPath(CollectionConstants.PARALLELISM) ? r0.getInt(CollectionConstants.PARALLELISM) : this.sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1)).option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(seaTunnelSource)).schema(TypeConverterUtils.convert((SeaTunnelDataType<?>) seaTunnelSource.getProducedType())).load();
            arrayList.add(load);
            registerInputTempView(this.pluginConfigs.get(i), load);
        }
        return arrayList;
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.AbstractPluginExecuteProcessor
    protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(List<? extends Config> list) {
        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (Config config : list) {
            PluginIdentifier of = PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, "source", config.getString(CollectionConstants.PLUGIN_NAME));
            hashSet.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            SeaTunnelSource createPluginInstance = seaTunnelSourcePluginDiscovery.createPluginInstance(of);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            arrayList.add(createPluginInstance);
        }
        this.sparkEnvironment.registerPlugin(new ArrayList(hashSet));
        return arrayList;
    }
}
