package org.apache.seatunnel.core.starter.flink.execution;

import com.google.common.collect.Lists;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.class */
public class TransformExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
    private static final String PLUGIN_TYPE = "transform";

    /* JADX INFO: Access modifiers changed from: protected */
    public TransformExecuteProcessor(List<URL> list, List<? extends Config> list2, JobContext jobContext) {
        super(list, list2, jobContext);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor
    protected List<SeaTunnelTransform> initializePlugins(List<URL> list, List<? extends Config> list2) {
        SeaTunnelTransformPluginDiscovery seaTunnelTransformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
        ArrayList arrayList = new ArrayList();
        List<SeaTunnelTransform> list3 = (List) list2.stream().map(config -> {
            PluginIdentifier of = PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, "transform", config.getString(CollectionConstants.PLUGIN_NAME));
            List<URL> pluginJarPaths = seaTunnelTransformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(of));
            SeaTunnelTransform createPluginInstance = seaTunnelTransformPluginDiscovery.createPluginInstance(of);
            list.addAll(pluginJarPaths);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            return createPluginInstance;
        }).distinct().collect(Collectors.toList());
        list.addAll(arrayList);
        return list3;
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<DataStream<Row>> execute(List<DataStream<Row>> list) throws TaskExecuteException {
        if (this.plugins.isEmpty()) {
            return list;
        }
        DataStream<Row> dataStream = list.get(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            try {
                SeaTunnelTransform seaTunnelTransform = (SeaTunnelTransform) this.plugins.get(i);
                Config config = this.pluginConfigs.get(i);
                dataStream = flinkTransform(seaTunnelTransform, fromSourceTable(config).orElse(dataStream));
                registerResultTable(config, dataStream);
                arrayList.add(dataStream);
            } catch (Exception e) {
                throw new TaskExecuteException(String.format("SeaTunnel transform task: %s execute error", ((SeaTunnelTransform) this.plugins.get(i)).getPluginName()), e);
            }
        }
        return arrayList;
    }

    protected DataStream<Row> flinkTransform(final SeaTunnelTransform seaTunnelTransform, DataStream<Row> dataStream) {
        SeaTunnelDataType convert = TypeConverterUtils.convert(dataStream.getType());
        seaTunnelTransform.setTypeInfo(convert);
        TypeInformation convert2 = TypeConverterUtils.convert(seaTunnelTransform.getProducedType());
        final FlinkRowConverter flinkRowConverter = new FlinkRowConverter(convert);
        final FlinkRowConverter flinkRowConverter2 = new FlinkRowConverter(seaTunnelTransform.getProducedType());
        return dataStream.flatMap(new FlatMapFunction<Row, Row>() { // from class: org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.1
            public void flatMap(Row row, Collector<Row> collector) throws Exception {
                SeaTunnelRow seaTunnelRow = (SeaTunnelRow) seaTunnelTransform.map(flinkRowConverter.reconvert(row));
                if (seaTunnelRow != null) {
                    collector.collect(flinkRowConverter2.convert(seaTunnelRow));
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Row) obj, (Collector<Row>) collector);
            }
        }, convert2);
    }
}
