package org.apache.seatunnel.engine.core.parse;

import com.hazelcast.logging.ILogger;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.class */
public class MultipleTableJobConfigParser {
    private static final Logger log = LoggerFactory.getLogger(MultipleTableJobConfigParser.class);
    private static final ILogger LOGGER = com.hazelcast.logging.Logger.getLogger(MultipleTableJobConfigParser.class);
    static final String DEFAULT_ID = "default-identifier";
    private final IdGenerator idGenerator;
    private final JobConfig jobConfig;
    private final List<URL> commonPluginJars;
    private final Config seaTunnelJobConfig;
    private final ReadonlyConfig envOptions;
    private final JobConfigParser fallbackParser;

    public MultipleTableJobConfigParser(String str, IdGenerator idGenerator, JobConfig jobConfig) {
        this(str, idGenerator, jobConfig, Collections.emptyList());
    }

    public MultipleTableJobConfigParser(String str, IdGenerator idGenerator, JobConfig jobConfig, List<URL> list) {
        this.idGenerator = idGenerator;
        this.jobConfig = jobConfig;
        this.commonPluginJars = list;
        this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(str, new String[0]));
        this.envOptions = ReadonlyConfig.fromConfig(this.seaTunnelJobConfig.getConfig("env"));
        this.fallbackParser = new JobConfigParser(idGenerator, list);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public ImmutablePair<List<Action>, Set<URL>> parse() {
        List configList = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "source", Collections.emptyList());
        List configList2 = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "transform", Collections.emptyList());
        List configList3 = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "sink", Collections.emptyList());
        List<URL> connectorJarList = getConnectorJarList(configList, configList3);
        if (!this.commonPluginJars.isEmpty()) {
            connectorJarList.addAll(this.commonPluginJars);
        }
        ClassLoader seaTunnelChildFirstClassLoader = new SeaTunnelChildFirstClassLoader(connectorJarList, Thread.currentThread().getContextClassLoader());
        Thread.currentThread().setContextClassLoader(seaTunnelChildFirstClassLoader);
        ConfigParserUtil.checkGraph(configList, configList2, configList3);
        fillJobConfig();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        log.info("start generating all sources.");
        Iterator it = configList.iterator();
        while (it.hasNext()) {
            Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource = parseSource((Config) it.next(), seaTunnelChildFirstClassLoader);
            linkedHashMap.put(parseSource._1(), parseSource._2());
        }
        log.info("start generating all transforms.");
        parseTransforms(configList2, seaTunnelChildFirstClassLoader, linkedHashMap);
        log.info("start generating all sinks.");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < configList3.size(); i++) {
            arrayList.addAll(parseSink(i, (Config) configList3.get(i), seaTunnelChildFirstClassLoader, linkedHashMap));
        }
        Set<URL> usedFactoryUrls = getUsedFactoryUrls(arrayList);
        usedFactoryUrls.addAll(this.commonPluginJars);
        arrayList.forEach(this::addCommonPluginJarsToAction);
        return new ImmutablePair<>(arrayList, usedFactoryUrls);
    }

    public Set<URL> getUsedFactoryUrls(List<Action> list) {
        HashSet hashSet = new HashSet();
        fillUsedFactoryUrls(list, hashSet);
        return hashSet;
    }

    private List<URL> getConnectorJarList(List<? extends Config> list, List<? extends Config> list2) {
        return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths((List) Stream.concat(list.stream().map(ConfigParserUtil::getFactoryId).map(str -> {
            return PluginIdentifier.of("seatunnel", "source", str);
        }), list2.stream().map(ConfigParserUtil::getFactoryId).map(str2 -> {
            return PluginIdentifier.of("seatunnel", "sink", str2);
        })).collect(Collectors.toList()));
    }

    private void fillUsedFactoryUrls(List<Action> list, Set<URL> set) {
        list.forEach(action -> {
            set.addAll(action.getJarUrls());
            if (action.getUpstream().isEmpty()) {
                return;
            }
            fillUsedFactoryUrls(action.getUpstream(), set);
        });
    }

    void addCommonPluginJarsToAction(Action action) {
        action.getJarUrls().addAll(this.commonPluginJars);
        if (action.getUpstream().isEmpty()) {
            return;
        }
        action.getUpstream().forEach(this::addCommonPluginJarsToAction);
    }

    private void fillJobConfig() {
        this.jobConfig.getJobContext().setJobMode((JobMode) this.envOptions.get(EnvCommonOptions.JOB_MODE));
        if (StringUtils.isEmpty(this.jobConfig.getName()) || this.jobConfig.getName().equals("SeaTunnel")) {
            this.jobConfig.setName((String) this.envOptions.get(EnvCommonOptions.JOB_NAME));
        }
        this.envOptions.getOptional(EnvCommonOptions.CHECKPOINT_INTERVAL).ifPresent(l -> {
            this.jobConfig.getEnvOptions().put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), l);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T extends Factory> boolean isFallback(ClassLoader classLoader, Class<T> cls, String str, Consumer<T> consumer) {
        Optional discoverOptionalFactory = FactoryUtil.discoverOptionalFactory(classLoader, cls, str);
        if (!discoverOptionalFactory.isPresent()) {
            return true;
        }
        try {
            consumer.accept(discoverOptionalFactory.get());
            return false;
        } catch (Exception e) {
            return (e instanceof UnsupportedOperationException) && "The Factory has not been implemented and the deprecated Plugin will be used.".equals(e.getMessage());
        }
    }

    private int getParallelism(ReadonlyConfig readonlyConfig) {
        return Math.max(1, ((Integer) readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(this.envOptions.get(CommonOptions.PARALLELISM))).intValue());
    }

    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(Config config, ClassLoader classLoader) {
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(config);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        String str = (String) fromConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
        int parallelism = getParallelism(fromConfig);
        boolean isFallback = isFallback(classLoader, TableSourceFactory.class, factoryId, tableSourceFactory -> {
            tableSourceFactory.createSource((TableFactoryContext) null);
        });
        ArrayList arrayList = new ArrayList();
        if (!isFallback) {
            List catalogTables = CatalogTableUtil.getCatalogTables(config, classLoader);
            if (!catalogTables.isEmpty()) {
                arrayList.addAll(catalogTables);
            }
        }
        if (isFallback || arrayList.isEmpty()) {
            return new Tuple2<>(str, Collections.singletonList(this.fallbackParser.parseSource(config, this.jobConfig, str, parallelism)));
        }
        if (fromConfig.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) {
            CatalogTable catalogTable = (CatalogTable) arrayList.get(0);
            arrayList.clear();
            arrayList.add(catalogTable);
        }
        List createAndPrepareSource = FactoryUtil.createAndPrepareSource(arrayList, fromConfig, classLoader, factoryId);
        Set<URL> factoryUrls = ConfigParserUtil.getFactoryUrls(fromConfig, classLoader, TableSourceFactory.class, factoryId);
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < createAndPrepareSource.size(); i++) {
            Tuple2 tuple2 = (Tuple2) createAndPrepareSource.get(i);
            long nextId = this.idGenerator.getNextId();
            String createSourceActionName = JobConfigParser.createSourceActionName(i, factoryId, str);
            SeaTunnelSource seaTunnelSource = (SeaTunnelSource) tuple2._1();
            seaTunnelSource.setJobContext(this.jobConfig.getJobContext());
            ensureJobModeMatch(this.jobConfig.getJobContext(), seaTunnelSource);
            SourceAction sourceAction = new SourceAction(nextId, createSourceActionName, (SeaTunnelSource) tuple2._1(), factoryUrls);
            sourceAction.setParallelism(parallelism);
            Iterator it = ((List) tuple2._2()).iterator();
            while (it.hasNext()) {
                arrayList2.add(new Tuple2((CatalogTable) it.next(), sourceAction));
            }
        }
        return new Tuple2<>(str, arrayList2);
    }

    public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource seaTunnelSource) {
        if (jobContext.getJobMode() == JobMode.BATCH && seaTunnelSource.getBoundedness() == Boundedness.UNBOUNDED) {
            throw new JobDefineCheckException(String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
        }
    }

    public void parseTransforms(List<? extends Config> list, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        if (CollectionUtils.isEmpty(list) || list.size() == 0) {
            return;
        }
        LinkedList linkedList = new LinkedList(list);
        while (!linkedList.isEmpty()) {
            parseTransform(linkedList, classLoader, linkedHashMap);
        }
    }

    private void parseTransform(Queue<Config> queue, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        Config poll = queue.poll();
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(poll);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        Set<URL> factoryUrls = ConfigParserUtil.getFactoryUrls(fromConfig, classLoader, TableTransformFactory.class, factoryId);
        Stream<String> stream = ConfigParserUtil.getInputIds(fromConfig).stream();
        linkedHashMap.getClass();
        List list = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).peek(list2 -> {
            if (list2.size() > 1) {
                throw new JobDefineCheckException("Adding transform to multi-table source is not supported.");
            }
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            if (!queue.isEmpty()) {
                queue.offer(poll);
                return;
            }
            list = (List) findLast(linkedHashMap);
        }
        String str = (String) fromConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
        boolean isFallback = isFallback(classLoader, TableTransformFactory.class, factoryId, tableTransformFactory -> {
            tableTransformFactory.createTransform((TableFactoryContext) null);
        });
        Set<Action> set = (Set) list.stream().map((v0) -> {
            return v0._2();
        }).collect(Collectors.toCollection(LinkedHashSet::new));
        SeaTunnelRowType producedType = getProducedType((Action) ((Tuple2) list.get(0))._2());
        checkProducedTypeEquals(set);
        int intValue = ((Integer) fromConfig.getOptional(CommonOptions.PARALLELISM).orElse(Integer.valueOf(((Action) ((Tuple2) list.get(0))._2()).getParallelism()))).intValue();
        if (isFallback) {
            linkedHashMap.put(str, Collections.singletonList(this.fallbackParser.parseTransform(poll, this.jobConfig, str, intValue, producedType, set)));
            return;
        }
        SeaTunnelTransform createAndPrepareTransform = FactoryUtil.createAndPrepareTransform((CatalogTable) ((Tuple2) list.get(0))._1(), fromConfig, classLoader, factoryId);
        createAndPrepareTransform.setJobContext(this.jobConfig.getJobContext());
        TransformAction transformAction = new TransformAction(this.idGenerator.getNextId(), JobConfigParser.createTransformActionName(0, factoryId, JobConfigParser.getTableName(poll)), new ArrayList(set), createAndPrepareTransform, factoryUrls);
        transformAction.setParallelism(intValue);
        linkedHashMap.put(str, Collections.singletonList(new Tuple2(createAndPrepareTransform.getProducedCatalogTable(), transformAction)));
    }

    public static SeaTunnelDataType<?> getProducedType(Action action) {
        if (action instanceof SourceAction) {
            return ((SourceAction) action).getSource().getProducedType();
        }
        if (action instanceof TransformAction) {
            return ((TransformAction) action).getTransform().getProducedType();
        }
        throw new UnsupportedOperationException();
    }

    public static void checkProducedTypeEquals(Set<Action> set) {
        SeaTunnelDataType<?> producedType = getProducedType((Action) new ArrayList(set).get(0));
        Iterator<Action> it = set.iterator();
        while (it.hasNext()) {
            if (!producedType.equals(getProducedType(it.next()))) {
                throw new JobDefineCheckException("Transform/Sink don't support processing data with two different structures.");
            }
        }
    }

    @Deprecated
    private static <T> T findLast(LinkedHashMap<?, T> linkedHashMap) {
        int size = linkedHashMap.size();
        int i = 1;
        for (T t : linkedHashMap.values()) {
            if (i == size) {
                return t;
            }
            i++;
        }
        return null;
    }

    public List<SinkAction<?, ?, ?, ?>> parseSink(int i, Config config, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(config);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        Stream<String> stream = ConfigParserUtil.getInputIds(fromConfig).stream();
        linkedHashMap.getClass();
        List<List<Tuple2<CatalogTable, Action>>> list = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            list = Collections.singletonList(findLast(linkedHashMap));
        } else if (list.size() > 1) {
            Iterator<List<Tuple2<CatalogTable, Action>>> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().size() > 1) {
                    throw new JobDefineCheckException("Sink don't support simultaneous writing of data from multi-table source and other sources.");
                }
            }
        }
        if (isFallback(classLoader, TableSinkFactory.class, factoryId, tableSinkFactory -> {
            tableSinkFactory.createSink((TableFactoryContext) null);
        })) {
            return this.fallbackParser.parseSinks(list, config, this.jobConfig);
        }
        Map<TablePath, CatalogTable> map = (Map) CatalogTableUtil.getCatalogTables(config, classLoader).stream().collect(Collectors.toMap(catalogTable -> {
            return catalogTable.getTableId().toTablePath();
        }, catalogTable2 -> {
            return catalogTable2;
        }));
        Set<URL> factoryUrls = ConfigParserUtil.getFactoryUrls(fromConfig, classLoader, TableSinkFactory.class, factoryId);
        ArrayList arrayList = new ArrayList();
        if (list.size() > 1) {
            Set<Action> set = (Set) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0._2();
            }).collect(Collectors.toCollection(LinkedHashSet::new));
            checkProducedTypeEquals(set);
            Tuple2<CatalogTable, Action> tuple2 = list.get(0).get(0);
            arrayList.add(createSinkAction((CatalogTable) tuple2._1(), map, set, fromConfig, classLoader, factoryUrls, factoryId, ((Action) tuple2._2()).getParallelism(), i));
            return arrayList;
        }
        for (Tuple2<CatalogTable, Action> tuple22 : list.get(0)) {
            arrayList.add(createSinkAction((CatalogTable) tuple22._1(), map, Collections.singleton(tuple22._2()), fromConfig, classLoader, factoryUrls, factoryId, ((Action) tuple22._2()).getParallelism(), i));
        }
        return arrayList;
    }

    private SinkAction<?, ?, ?, ?> createSinkAction(CatalogTable catalogTable, Map<TablePath, CatalogTable> map, Set<Action> set, ReadonlyConfig readonlyConfig, ClassLoader classLoader, Set<URL> set2, String str, int i, int i2) {
        Optional<CatalogTable> findFirst = map.size() == 1 ? map.values().stream().findFirst() : Optional.ofNullable(map.get(catalogTable.getTableId().toTablePath()));
        if (findFirst.isPresent()) {
            catalogTable = findFirst.get();
        }
        SeaTunnelSink createAndPrepareSink = FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader, str);
        createAndPrepareSink.setJobContext(this.jobConfig.getJobContext());
        SinkConfig sinkConfig = new SinkConfig(catalogTable.getTableId().toTablePath().toString());
        SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(this.idGenerator.getNextId(), JobConfigParser.createSinkActionName(i2, str, sinkConfig.getMultipleRowTableId()), new ArrayList(set), createAndPrepareSink, set2, sinkConfig);
        handleSaveMode(createAndPrepareSink);
        sinkAction.setParallelism(i);
        return sinkAction;
    }

    public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> seaTunnelSink) {
        if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
            SupportDataSaveMode supportDataSaveMode = (SupportDataSaveMode) seaTunnelSink;
            supportDataSaveMode.handleSaveMode(supportDataSaveMode.getDataSaveMode());
        }
    }
}
