package org.apache.seatunnel.core.starter.spark.execution;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.class */
public class TransformExecuteProcessor extends SparkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
    private static final Logger log = LoggerFactory.getLogger(TransformExecuteProcessor.class);
    private static final String PLUGIN_TYPE = "transform";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor$TransformIterator.class */
    public static class TransformIterator implements Iterator<Row>, Serializable {
        private Iterator<Row> sourceIterator;
        private SeaTunnelTransform<SeaTunnelRow> transform;
        private StructType structType;
        private SeaTunnelRowConverter inputRowConverter;
        private SeaTunnelRowConverter outputRowConverter;

        public TransformIterator(Iterator<Row> it, SeaTunnelTransform<SeaTunnelRow> seaTunnelTransform, StructType structType, SeaTunnelRowConverter seaTunnelRowConverter, SeaTunnelRowConverter seaTunnelRowConverter2) {
            this.sourceIterator = it;
            this.transform = seaTunnelTransform;
            this.structType = structType;
            this.inputRowConverter = seaTunnelRowConverter;
            this.outputRowConverter = seaTunnelRowConverter2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.sourceIterator.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Row next() {
            try {
                SeaTunnelRow map = this.transform.map(this.inputRowConverter.reconvert(new SeaTunnelRow(((Row) this.sourceIterator.next()).values())));
                if (map == null) {
                    return null;
                }
                return new GenericRowWithSchema(this.outputRowConverter.convert(map).getFields(), this.structType);
            } catch (Exception e) {
                throw new TaskExecuteException("Row convert failed, caused: " + e.getMessage(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TransformExecuteProcessor(SparkRuntimeEnvironment sparkRuntimeEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(sparkRuntimeEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor
    protected List<SeaTunnelTransform> initializePlugins(List<? extends Config> list) {
        SeaTunnelTransformPluginDiscovery seaTunnelTransformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
        ArrayList arrayList = new ArrayList();
        List<SeaTunnelTransform> list2 = (List) list.stream().map(config -> {
            PluginIdentifier of = PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, "transform", config.getString(CollectionConstants.PLUGIN_NAME));
            arrayList.addAll(seaTunnelTransformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            SeaTunnelTransform createPluginInstance = seaTunnelTransformPluginDiscovery.createPluginInstance(of);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            return createPluginInstance;
        }).distinct().collect(Collectors.toList());
        this.sparkRuntimeEnvironment.registerPlugin(arrayList);
        return list2;
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<Dataset<Row>> execute(List<Dataset<Row>> list) throws TaskExecuteException {
        if (this.plugins.isEmpty()) {
            return list;
        }
        Dataset<Row> dataset = list.get(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            try {
                SeaTunnelTransform seaTunnelTransform = (SeaTunnelTransform) this.plugins.get(i);
                Config config = this.pluginConfigs.get(i);
                dataset = sparkTransform(seaTunnelTransform, fromSourceTable(config, this.sparkRuntimeEnvironment).orElse(dataset));
                registerInputTempView(config, dataset);
                arrayList.add(dataset);
            } catch (Exception e) {
                throw new TaskExecuteException(String.format("SeaTunnel transform task: %s execute error", ((SeaTunnelTransform) this.plugins.get(i)).getPluginName()), e);
            }
        }
        return arrayList;
    }

    private Dataset<Row> sparkTransform(SeaTunnelTransform seaTunnelTransform, Dataset<Row> dataset) throws IOException {
        SeaTunnelDataType convert = TypeConverterUtils.convert(dataset.schema());
        seaTunnelTransform.setTypeInfo(convert);
        StructType convert2 = TypeConverterUtils.convert(seaTunnelTransform.getProducedType());
        SeaTunnelRowConverter seaTunnelRowConverter = new SeaTunnelRowConverter(convert);
        SeaTunnelRowConverter seaTunnelRowConverter2 = new SeaTunnelRowConverter(seaTunnelTransform.getProducedType());
        return dataset.mapPartitions(it -> {
            return new TransformIterator(it, seaTunnelTransform, convert2, seaTunnelRowConverter, seaTunnelRowConverter2);
        }, RowEncoder.apply(convert2)).filter(row -> {
            return row != null;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1886787786:
                if (implMethodName.equals("lambda$sparkTransform$6d793312$1")) {
                    z = true;
                    break;
                }
                break;
            case 1223921317:
                if (implMethodName.equals("lambda$sparkTransform$aab454a0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Row;)Z")) {
                    return row -> {
                        return row != null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapPartitionsFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/seatunnel/api/transform/SeaTunnelTransform;Lorg/apache/spark/sql/types/StructType;Lorg/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter;Lorg/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SeaTunnelTransform seaTunnelTransform = (SeaTunnelTransform) serializedLambda.getCapturedArg(0);
                    StructType structType = (StructType) serializedLambda.getCapturedArg(1);
                    SeaTunnelRowConverter seaTunnelRowConverter = (SeaTunnelRowConverter) serializedLambda.getCapturedArg(2);
                    SeaTunnelRowConverter seaTunnelRowConverter2 = (SeaTunnelRowConverter) serializedLambda.getCapturedArg(3);
                    return it -> {
                        return new TransformIterator(it, seaTunnelTransform, structType, seaTunnelRowConverter, seaTunnelRowConverter2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
