package org.apache.seatunnel.api.table.factory;

import java.io.Serializable;
import java.net.URL;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.TablePlaceholderProcessor;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/seatunnel/api/table/factory/FactoryUtil.class */
public final class FactoryUtil {
    private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
    public static final String DEFAULT_ID = "default-identifier";

    public static <T, SplitT extends SourceSplit, StateT extends Serializable> Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> createAndPrepareSource(ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str) {
        return restoreAndPrepareSource(readonlyConfig, classLoader, str, (ChangeStreamTableSourceCheckpoint) null);
    }

    public static <T, SplitT extends SourceSplit, StateT extends Serializable> Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> restoreAndPrepareSource(ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str, ChangeStreamTableSourceCheckpoint changeStreamTableSourceCheckpoint) {
        SeaTunnelSource createAndPrepareSource;
        List<CatalogTable> convertDataTypeToCatalogTables;
        try {
            TableSourceFactory tableSourceFactory = (TableSourceFactory) discoverFactory(classLoader, TableSourceFactory.class, str);
            if (!(tableSourceFactory instanceof ChangeStreamTableSourceFactory) || changeStreamTableSourceCheckpoint == null) {
                createAndPrepareSource = createAndPrepareSource(tableSourceFactory, readonlyConfig, classLoader);
            } else {
                ChangeStreamTableSourceFactory changeStreamTableSourceFactory = (ChangeStreamTableSourceFactory) tableSourceFactory;
                createAndPrepareSource = restoreAndPrepareSource(changeStreamTableSourceFactory, readonlyConfig, classLoader, changeStreamTableSourceFactory.deserializeTableSourceState(changeStreamTableSourceCheckpoint));
            }
            try {
                convertDataTypeToCatalogTables = createAndPrepareSource.getProducedCatalogTables();
            } catch (UnsupportedOperationException e) {
                convertDataTypeToCatalogTables = CatalogTableUtil.convertDataTypeToCatalogTables(createAndPrepareSource.getProducedType(), (String) readonlyConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse(DEFAULT_ID));
            }
            LOG.info("get the CatalogTable from source {}: {}", createAndPrepareSource.getPluginName(), convertDataTypeToCatalogTables.stream().map((v0) -> {
                return v0.getTableId();
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(TablePlaceholderProcessor.FIELD_DELIMITER)));
            if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) {
                CatalogTable catalogTable = convertDataTypeToCatalogTables.get(0);
                convertDataTypeToCatalogTables.clear();
                convertDataTypeToCatalogTables.add(catalogTable);
            }
            return new Tuple2<>(createAndPrepareSource, convertDataTypeToCatalogTables);
        } catch (Throwable th) {
            throw new FactoryException(String.format("Unable to create a source for identifier '%s'.", str), th);
        }
    }

    private static <T, SplitT extends SourceSplit, StateT extends Serializable> SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(TableSourceFactory tableSourceFactory, ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
        TableSourceFactoryContext tableSourceFactoryContext = new TableSourceFactoryContext(readonlyConfig, classLoader);
        ConfigValidator.of(tableSourceFactoryContext.getOptions()).validate(tableSourceFactory.optionRule());
        return tableSourceFactory.createSource(tableSourceFactoryContext).createSource();
    }

    private static <T, SplitT extends SourceSplit, StateT extends Serializable> SeaTunnelSource<T, SplitT, StateT> restoreAndPrepareSource(ChangeStreamTableSourceFactory changeStreamTableSourceFactory, ReadonlyConfig readonlyConfig, ClassLoader classLoader, ChangeStreamTableSourceState changeStreamTableSourceState) {
        TableSourceFactoryContext tableSourceFactoryContext = new TableSourceFactoryContext(readonlyConfig, classLoader);
        ConfigValidator.of(tableSourceFactoryContext.getOptions()).validate(changeStreamTableSourceFactory.optionRule());
        LOG.info("Restore create source from checkpoint state: {}", changeStreamTableSourceState);
        return changeStreamTableSourceFactory.restoreSource(tableSourceFactoryContext, changeStreamTableSourceState).createSource();
    }

    public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str) {
        try {
            TableSinkFactory tableSinkFactory = (TableSinkFactory) discoverFactory(classLoader, TableSinkFactory.class, str);
            TableSinkFactoryContext replacePlaceholderAndCreate = TableSinkFactoryContext.replacePlaceholderAndCreate(catalogTable, readonlyConfig, classLoader, tableSinkFactory.excludeTablePlaceholderReplaceKeys());
            ConfigValidator.of(replacePlaceholderAndCreate.getOptions()).validate(tableSinkFactory.optionRule());
            LOG.info("Create sink '{}' with upstream input catalog-table[database: {}, schema: {}, table: {}]", new Object[]{str, catalogTable.getTablePath().getDatabaseName(), catalogTable.getTablePath().getSchemaName(), catalogTable.getTablePath().getTableName()});
            return tableSinkFactory.createSink(replacePlaceholderAndCreate).createSink();
        } catch (Throwable th) {
            throw new FactoryException(String.format("Unable to create a sink for identifier '%s'.", str), th);
        }
    }

    public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createMultiTableSink(Map<String, SeaTunnelSink> map, ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
        try {
            MultiTableSinkFactory multiTableSinkFactory = new MultiTableSinkFactory();
            MultiTableFactoryContext multiTableFactoryContext = new MultiTableFactoryContext(readonlyConfig, classLoader, map);
            ConfigValidator.of(multiTableFactoryContext.getOptions()).validate(multiTableSinkFactory.optionRule());
            return multiTableSinkFactory.createSink(multiTableFactoryContext).createSink();
        } catch (Throwable th) {
            throw new FactoryException("Unable to create a sink for identifier 'MultiTableSink'.", th);
        }
    }

    public static Optional<Catalog> createOptionalCatalog(String str, ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str2) {
        return discoverOptionalFactory(classLoader, CatalogFactory.class, str2).map(catalogFactory -> {
            return catalogFactory.createCatalog(str, readonlyConfig);
        });
    }

    public static <T extends Factory> URL getFactoryUrl(T t) {
        return t.getClass().getProtectionDomain().getCodeSource().getLocation();
    }

    public static <T extends Factory> Optional<T> discoverOptionalFactory(ClassLoader classLoader, Class<T> cls, String str) {
        List discoverFactories = discoverFactories(classLoader, cls);
        if (discoverFactories.isEmpty()) {
            return Optional.empty();
        }
        List list = (List) discoverFactories.stream().filter(factory -> {
            return factory.factoryIdentifier().equalsIgnoreCase(str);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return Optional.empty();
        }
        checkMultipleMatchingFactories(str, cls, list);
        return Optional.of(list.get(0));
    }

    public static <T extends Factory> T discoverFactory(ClassLoader classLoader, Class<T> cls, String str) {
        List discoverFactories = discoverFactories(classLoader, cls);
        if (discoverFactories.isEmpty()) {
            throw new FactoryException(String.format("Could not find any factories that implement '%s' in the classpath.", cls.getName()));
        }
        List list = (List) discoverFactories.stream().filter(factory -> {
            return factory.factoryIdentifier().equalsIgnoreCase(str);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            throw new FactoryException(String.format("Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\nAvailable factory identifiers are:\n\n%s", str, cls.getName(), discoverFactories.stream().map((v0) -> {
                return v0.factoryIdentifier();
            }).distinct().sorted().collect(Collectors.joining(StringUtils.LF))));
        }
        checkMultipleMatchingFactories(str, cls, list);
        return (T) list.get(0);
    }

    private static <T extends Factory> void checkMultipleMatchingFactories(String str, Class<T> cls, List<T> list) {
        if (list.size() > 1) {
            throw new FactoryException(String.format("Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\nAmbiguous factory classes are:\n\n%s", str, cls.getName(), list.stream().map(factory -> {
                return factory.getClass().getName();
            }).sorted().collect(Collectors.joining(StringUtils.LF))));
        }
    }

    public static <T extends Factory> List<T> discoverFactories(ClassLoader classLoader, Class<T> cls) {
        return (List) discoverFactories(classLoader).stream().filter(factory -> {
            return cls.isAssignableFrom(factory.getClass());
        }).map(factory2 -> {
            return factory2;
        }).collect(Collectors.toList());
    }

    public static List<Factory> discoverFactories(ClassLoader classLoader) {
        try {
            LinkedList linkedList = new LinkedList();
            Iterator it = ServiceLoader.load(Factory.class, classLoader).iterator();
            linkedList.getClass();
            it.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            return linkedList;
        } catch (ServiceConfigurationError e) {
            LOG.error("Could not load service provider for factories.", e);
            throw new FactoryException("Could not load service provider for factories.", e);
        }
    }

    public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory tableSourceFactory) {
        if (tableSourceFactory == null) {
            throw new NullPointerException("factory is marked non-null but is null");
        }
        OptionRule optionRule = tableSourceFactory.optionRule();
        if (optionRule == null) {
            throw new FactoryException("sourceOptionRule can not be null");
        }
        Class<? extends SeaTunnelSource> sourceClass = tableSourceFactory.getSourceClass();
        if ((tableSourceFactory instanceof SupportParallelism) || SupportParallelism.class.isAssignableFrom(sourceClass)) {
            optionRule.getOptionalOptions().addAll(OptionRule.builder().optional(CommonOptions.PARALLELISM).build().getOptionalOptions());
        }
        return optionRule;
    }

    public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory tableSinkFactory) {
        if (tableSinkFactory == null) {
            throw new NullPointerException("factory is marked non-null but is null");
        }
        OptionRule optionRule = tableSinkFactory.optionRule();
        if (optionRule == null) {
            throw new FactoryException("sinkOptionRule can not be null");
        }
        return optionRule;
    }

    public static SeaTunnelTransform<?> createAndPrepareMultiTableTransform(List<CatalogTable> list, ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str) {
        TableTransformFactory tableTransformFactory = (TableTransformFactory) discoverFactory(classLoader, TableTransformFactory.class, str);
        TableTransformFactoryContext tableTransformFactoryContext = new TableTransformFactoryContext(list, readonlyConfig, classLoader);
        ConfigValidator.of(tableTransformFactoryContext.getOptions()).validate(tableTransformFactory.optionRule());
        return tableTransformFactory.createTransform(tableTransformFactoryContext).createTransform();
    }
}
