package org.apache.seatunnel.core.starter.flink.execution;

import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.class */
public class TransformExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
    /* JADX INFO: Access modifiers changed from: protected */
    public TransformExecuteProcessor(List<URL> list, Config config, List<? extends Config> list2, JobContext jobContext) {
        super(list, config, list2, jobContext);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor
    protected List<TableTransformFactory> initializePlugins(List<URL> list, List<? extends Config> list2) {
        SeaTunnelTransformPluginDiscovery seaTunnelTransformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
        return (List) list2.stream().map(config -> {
            return PluginUtil.createTransformFactory(seaTunnelTransformPluginDiscovery, config, list);
        }).distinct().collect(Collectors.toList());
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> list) throws TaskExecuteException {
        if (this.plugins.isEmpty()) {
            return list;
        }
        DataStreamTableInfo dataStreamTableInfo = list.get(0);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        for (int i = 0; i < this.plugins.size(); i++) {
            try {
                Config config = this.pluginConfigs.get(i);
                DataStreamTableInfo orElse = fromSourceTable(config, list).orElse(dataStreamTableInfo);
                TableTransformFactory tableTransformFactory = (TableTransformFactory) this.plugins.get(i);
                TableTransformFactoryContext tableTransformFactoryContext = new TableTransformFactoryContext(Collections.singletonList(orElse.getCatalogTable()), ReadonlyConfig.fromConfig(config), contextClassLoader);
                ConfigValidator.of(tableTransformFactoryContext.getOptions()).validate(tableTransformFactory.optionRule());
                SeaTunnelTransform createTransform = tableTransformFactory.createTransform(tableTransformFactoryContext).createTransform();
                SeaTunnelRowType seaTunnelRowType = orElse.getCatalogTable().getSeaTunnelRowType();
                createTransform.setJobContext(this.jobContext);
                DataStream<Row> flinkTransform = flinkTransform(seaTunnelRowType, createTransform, orElse.getDataStream());
                registerResultTable(config, flinkTransform);
                list.add(new DataStreamTableInfo(flinkTransform, createTransform.getProducedCatalogTable(), config.hasPath(CommonOptions.RESULT_TABLE_NAME.key()) ? config.getString(CommonOptions.RESULT_TABLE_NAME.key()) : null));
            } catch (Exception e) {
                throw new TaskExecuteException(String.format("SeaTunnel transform task: %s execute error", ((TableTransformFactory) this.plugins.get(i)).factoryIdentifier()), e);
            }
        }
        return list;
    }

    protected DataStream<Row> flinkTransform(SeaTunnelRowType seaTunnelRowType, SeaTunnelTransform seaTunnelTransform, DataStream<Row> dataStream) {
        TypeInformation convert = TypeConverterUtils.convert(seaTunnelTransform.getProducedCatalogTable().getSeaTunnelRowType());
        FlinkRowConverter flinkRowConverter = new FlinkRowConverter(seaTunnelRowType);
        FlinkRowConverter flinkRowConverter2 = new FlinkRowConverter(seaTunnelTransform.getProducedCatalogTable().getSeaTunnelRowType());
        return dataStream.flatMap((row, collector) -> {
            SeaTunnelRow seaTunnelRow = (SeaTunnelRow) seaTunnelTransform.map(flinkRowConverter.reconvert(row));
            if (seaTunnelRow != null) {
                collector.collect(flinkRowConverter2.convert(seaTunnelRow));
            }
        }, convert);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 563483757:
                if (implMethodName.equals("lambda$flinkTransform$d9a42ad2$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/seatunnel/translation/flink/serialization/FlinkRowConverter;Lorg/apache/seatunnel/api/transform/SeaTunnelTransform;Lorg/apache/seatunnel/translation/flink/serialization/FlinkRowConverter;Lorg/apache/flink/types/Row;Lorg/apache/flink/util/Collector;)V")) {
                    FlinkRowConverter flinkRowConverter = (FlinkRowConverter) serializedLambda.getCapturedArg(0);
                    SeaTunnelTransform seaTunnelTransform = (SeaTunnelTransform) serializedLambda.getCapturedArg(1);
                    FlinkRowConverter flinkRowConverter2 = (FlinkRowConverter) serializedLambda.getCapturedArg(2);
                    return (row, collector) -> {
                        SeaTunnelRow seaTunnelRow = (SeaTunnelRow) seaTunnelTransform.map(flinkRowConverter.reconvert(row));
                        if (seaTunnelRow != null) {
                            collector.collect(flinkRowConverter2.convert(seaTunnelRow));
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
