package org.apache.seatunnel.core.starter.spark.execution;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

/* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.class */
public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
    private static final String PLUGIN_TYPE = PluginType.SINK.getType();

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkExecuteProcessor(SparkRuntimeEnvironment sparkRuntimeEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(sparkRuntimeEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor
    protected List<Optional<? extends Factory>> initializePlugins(List<? extends Config> list) {
        SeaTunnelFactoryDiscovery seaTunnelFactoryDiscovery = new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
        ArrayList arrayList = new ArrayList();
        List<Optional<? extends Factory>> list2 = (List) list.stream().map(config -> {
            return PluginUtil.createSinkFactory(seaTunnelFactoryDiscovery, seaTunnelSinkPluginDiscovery, config, new ArrayList());
        }).distinct().collect(Collectors.toList());
        this.sparkRuntimeEnvironment.registerPlugin(arrayList);
        return list2;
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<DatasetTableInfo> execute(List<DatasetTableInfo> list) throws TaskExecuteException {
        SeaTunnelSink createSink;
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        DatasetTableInfo datasetTableInfo = list.get(0);
        for (int i = 0; i < this.plugins.size(); i++) {
            Config config = this.pluginConfigs.get(i);
            DatasetTableInfo orElse = fromSourceTable(config, this.sparkRuntimeEnvironment, list).orElse(datasetTableInfo);
            SeaTunnelRowType seaTunnelRowType = orElse.getCatalogTable().getSeaTunnelRowType();
            Dataset<Row> dataset = orElse.getDataset();
            dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), config.hasPath(CommonOptions.PARALLELISM.key()) ? config.getInt(CommonOptions.PARALLELISM.key()) : this.sparkRuntimeEnvironment.getSparkConf().getInt(CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue().intValue()));
            Optional optional = (Optional) this.plugins.get(i);
            if (!optional.isPresent() || isFallback((Factory) optional.get())) {
                createSink = fallbackCreateSink(seaTunnelSinkPluginDiscovery, PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, PLUGIN_TYPE, config.getString(CommonOptions.PLUGIN_NAME.key())), config);
                createSink.setJobContext(this.jobContext);
                createSink.setTypeInfo(seaTunnelRowType);
            } else {
                TableSinkFactoryContext tableSinkFactoryContext = new TableSinkFactoryContext(orElse.getCatalogTable(), ReadonlyConfig.fromConfig(config), contextClassLoader);
                ConfigValidator.of(tableSinkFactoryContext.getOptions()).validate(((Factory) optional.get()).optionRule());
                createSink = ((TableSinkFactory) optional.get()).createSink(tableSinkFactoryContext).createSink();
                createSink.setJobContext(this.jobContext);
            }
            if (SupportSaveMode.class.isAssignableFrom(createSink.getClass())) {
                Optional<SaveModeHandler> saveModeHandler = ((SupportSaveMode) createSink).getSaveModeHandler();
                if (saveModeHandler.isPresent()) {
                    try {
                        SaveModeHandler saveModeHandler2 = saveModeHandler.get();
                        Throwable th = null;
                        try {
                            try {
                                saveModeHandler2.handleSaveMode();
                                if (saveModeHandler2 != null) {
                                    if (0 != 0) {
                                        try {
                                            saveModeHandler2.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        saveModeHandler2.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED, e);
                    }
                } else {
                    continue;
                }
            }
            SparkSinkInjector.inject((DataFrameWriter<Row>) dataset.write(), (SeaTunnelSink<?, ?, ?, ?>) createSink, orElse.getCatalogTable()).option("checkpointLocation", "/tmp").mode(SaveMode.Append).save();
        }
        return null;
    }

    public boolean isFallback(Factory factory) {
        try {
            ((TableSinkFactory) factory).createSink(null);
            return false;
        } catch (Exception e) {
            return (e instanceof UnsupportedOperationException) && "The Factory has not been implemented and the deprecated Plugin will be used.".equals(e.getMessage());
        }
    }

    public SeaTunnelSink fallbackCreateSink(SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery, PluginIdentifier pluginIdentifier, Config config) {
        SeaTunnelSink createPluginInstance = seaTunnelSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
        createPluginInstance.prepare(config);
        return createPluginInstance;
    }
}
