package org.apache.seatunnel.core.starter.flink.execution;

import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.class */
public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
    private static final Logger log = LoggerFactory.getLogger(SinkExecuteProcessor.class);
    private static final String PLUGIN_TYPE = PluginType.SINK.getType();

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkExecuteProcessor(List<URL> list, Config config, List<? extends Config> list2, JobContext jobContext) {
        super(list, config, list2, jobContext);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor
    protected List<Optional<? extends Factory>> initializePlugins(List<URL> list, List<? extends Config> list2) {
        SeaTunnelFactoryDiscovery seaTunnelFactoryDiscovery = new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
        return (List) list2.stream().map(config -> {
            return PluginUtil.createSinkFactory(seaTunnelFactoryDiscovery, seaTunnelSinkPluginDiscovery, config, list);
        }).distinct().collect(Collectors.toList());
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> list) throws TaskExecuteException {
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
        DataStreamTableInfo dataStreamTableInfo = list.get(0);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        for (int i = 0; i < this.plugins.size(); i++) {
            Config config = this.pluginConfigs.get(i);
            DataStreamTableInfo orElse = fromSourceTable(config, list).orElse(dataStreamTableInfo);
            Optional optional = (Optional) this.plugins.get(i);
            boolean z = !optional.isPresent() || isFallback((Factory) optional.get());
            HashMap hashMap = new HashMap();
            if (z) {
                for (CatalogTable catalogTable : orElse.getCatalogTables()) {
                    SeaTunnelSink fallbackCreateSink = fallbackCreateSink(seaTunnelSinkPluginDiscovery, PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, PLUGIN_TYPE, config.getString(CommonOptions.PLUGIN_NAME.key())), config);
                    fallbackCreateSink.setJobContext(this.jobContext);
                    fallbackCreateSink.setTypeInfo(catalogTable.getSeaTunnelRowType());
                    handleSaveMode(fallbackCreateSink);
                    hashMap.put(catalogTable.getTableId().toTablePath().toString(), fallbackCreateSink);
                }
            } else {
                for (CatalogTable catalogTable2 : orElse.getCatalogTables()) {
                    TableSinkFactoryContext replacePlaceholderAndCreate = TableSinkFactoryContext.replacePlaceholderAndCreate(catalogTable2, ReadonlyConfig.fromConfig(config), contextClassLoader, ((TableSinkFactory) optional.get()).excludeTablePlaceholderReplaceKeys());
                    ConfigValidator.of(replacePlaceholderAndCreate.getOptions()).validate(((Factory) optional.get()).optionRule());
                    SeaTunnelSink createSink = ((TableSinkFactory) optional.get()).createSink(replacePlaceholderAndCreate).createSink();
                    createSink.setJobContext(this.jobContext);
                    handleSaveMode(createSink);
                    hashMap.put(catalogTable2.getTableId().toTablePath().toString(), createSink);
                }
            }
            SeaTunnelSink tryGenerateMultiTableSink = tryGenerateMultiTableSink(hashMap, ReadonlyConfig.fromConfig(config), contextClassLoader);
            boolean hasPath = config.hasPath(CommonOptions.PARALLELISM.key());
            boolean hasPath2 = this.envConfig.hasPath(CommonOptions.PARALLELISM.key());
            int i2 = hasPath ? config.getInt(CommonOptions.PARALLELISM.key()) : hasPath2 ? this.envConfig.getInt(CommonOptions.PARALLELISM.key()) : 1;
            DataStreamSink name = orElse.getDataStream().sinkTo(SinkV1Adapter.wrap(new FlinkSink(tryGenerateMultiTableSink, orElse.getCatalogTables(), i2))).name(String.format("%s-Sink", tryGenerateMultiTableSink.getPluginName()));
            if (hasPath || hasPath2) {
                name.setParallelism(i2);
            }
        }
        return null;
    }

    public SeaTunnelSink tryGenerateMultiTableSink(Map<String, SeaTunnelSink> map, ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
        if (!map.values().stream().anyMatch(seaTunnelSink -> {
            return !(seaTunnelSink instanceof SupportMultiTableSink);
        })) {
            return FactoryUtil.createMultiTableSink(map, readonlyConfig, classLoader);
        }
        log.info("Unsupported multi table sink api, rollback to sink template");
        return map.values().iterator().next();
    }

    public boolean isFallback(Factory factory) {
        try {
            ((TableSinkFactory) factory).createSink(null);
            return false;
        } catch (Exception e) {
            return (e instanceof UnsupportedOperationException) && "The Factory has not been implemented and the deprecated Plugin will be used.".equals(e.getMessage());
        }
    }

    public SeaTunnelSink fallbackCreateSink(SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery, PluginIdentifier pluginIdentifier, Config config) {
        SeaTunnelSink createPluginInstance = seaTunnelSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
        createPluginInstance.prepare(config);
        return createPluginInstance;
    }

    public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
        if (seaTunnelSink instanceof SupportSaveMode) {
            Optional<SaveModeHandler> saveModeHandler = ((SupportSaveMode) seaTunnelSink).getSaveModeHandler();
            if (saveModeHandler.isPresent()) {
                try {
                    SaveModeHandler saveModeHandler2 = saveModeHandler.get();
                    try {
                        saveModeHandler2.open();
                        new SaveModeExecuteWrapper(saveModeHandler2).execute();
                        if (saveModeHandler2 != null) {
                            saveModeHandler2.close();
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED, e);
                }
            }
        }
    }
}
