package org.apache.seatunnel.core.starter.flink.execution;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.class */
public abstract class FlinkAbstractPluginExecuteProcessor<T> implements PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> {
    protected static final String ENGINE_TYPE = "seatunnel";
    protected static final String SOURCE_TABLE_NAME = "source_table_name";
    protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> {
        if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
            ReflectionUtils.invoke((URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get(), "addURL", url);
        } else {
            if (!(classLoader instanceof URLClassLoader)) {
                throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
            }
            ReflectionUtils.invoke(classLoader, "addURL", url);
        }
    };
    protected FlinkRuntimeEnvironment flinkRuntimeEnvironment;
    protected final List<? extends Config> pluginConfigs;
    protected JobContext jobContext;
    protected final List<T> plugins;
    protected final Config envConfig;

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkAbstractPluginExecuteProcessor(List<URL> list, Config config, List<? extends Config> list2, JobContext jobContext) {
        this.pluginConfigs = list2;
        this.jobContext = jobContext;
        this.plugins = initializePlugins(list, list2);
        this.envConfig = config;
    }

    @Override // org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor
    public void setRuntimeEnvironment(FlinkRuntimeEnvironment flinkRuntimeEnvironment) {
        this.flinkRuntimeEnvironment = flinkRuntimeEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<DataStreamTableInfo> fromSourceTable(Config config, List<DataStreamTableInfo> list) {
        if (!config.hasPath("source_table_name")) {
            return Optional.empty();
        }
        StreamTableEnvironment streamTableEnvironment = this.flinkRuntimeEnvironment.getStreamTableEnvironment();
        String string = config.getString("source_table_name");
        return Optional.of(new DataStreamTableInfo(TableUtil.tableToDataStream(streamTableEnvironment, streamTableEnvironment.from(string)), list.stream().filter(dataStreamTableInfo -> {
            return string.equals(dataStreamTableInfo.getTableName());
        }).findFirst().orElseThrow(() -> {
            return new SeaTunnelException(String.format("table %s not found", string));
        }).getCatalogTable(), string));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerResultTable(Config config, DataStream<Row> dataStream) {
        if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
            this.flinkRuntimeEnvironment.registerResultTable(config, dataStream, config.getString(CommonOptions.RESULT_TABLE_NAME.key()));
        }
    }

    protected abstract List<T> initializePlugins(List<URL> list, List<? extends Config> list2);
}
