package org.apache.iceberg.flink;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.source.IcebergTableSource;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/iceberg/flink/FlinkDynamicTableFactory.class */
public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
    static final String FACTORY_IDENTIFIER = "iceberg";
    private static final ConfigOption<String> CATALOG_NAME = ConfigOptions.key("catalog-name").stringType().noDefaultValue().withDescription("Catalog name");
    private static final ConfigOption<String> CATALOG_TYPE = ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE).stringType().noDefaultValue().withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
    private static final ConfigOption<String> CATALOG_DATABASE = ConfigOptions.key("catalog-database").stringType().defaultValue("default").withDescription("Database name managed in the iceberg catalog.");
    private static final ConfigOption<String> CATALOG_TABLE = ConfigOptions.key("catalog-table").stringType().noDefaultValue().withDescription("Table name managed in the underlying iceberg catalog and database.");
    private final FlinkCatalog catalog;

    public FlinkDynamicTableFactory() {
        this.catalog = null;
    }

    public FlinkDynamicTableFactory(FlinkCatalog flinkCatalog) {
        this.catalog = flinkCatalog;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        Map options = catalogTable.getOptions();
        return new IcebergTableSource(this.catalog != null ? createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : createTableLoader(catalogTable, options, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()), TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()), options, context.getConfiguration());
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        Map options = catalogTable.getOptions();
        return new IcebergTableSink(this.catalog != null ? createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : createTableLoader(catalogTable, options, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()), TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()), context.getConfiguration(), options);
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(CATALOG_TYPE);
        newHashSet.add(CATALOG_NAME);
        return newHashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(CATALOG_DATABASE);
        newHashSet.add(CATALOG_TABLE);
        return newHashSet;
    }

    public String factoryIdentifier() {
        return "iceberg";
    }

    private static TableLoader createTableLoader(ResolvedCatalogTable resolvedCatalogTable, Map<String, String> map, String str, String str2) {
        Configuration configuration = new Configuration();
        Objects.requireNonNull(configuration);
        map.forEach(configuration::setString);
        String string = configuration.getString(CATALOG_NAME);
        Preconditions.checkNotNull(string, "Table property '%s' cannot be null", new Object[]{CATALOG_NAME.key()});
        String string2 = configuration.getString(CATALOG_DATABASE, str);
        Preconditions.checkNotNull(string2, "The iceberg database name cannot be null");
        String string3 = configuration.getString(CATALOG_TABLE, str2);
        Preconditions.checkNotNull(string3, "The iceberg table name cannot be null");
        FlinkCatalog createCatalog = new FlinkCatalogFactory().createCatalog(string, map, FlinkCatalogFactory.clusterHadoopConf());
        ObjectPath objectPath = new ObjectPath(string2, string3);
        if (!createCatalog.databaseExists(string2)) {
            try {
                createCatalog.createDatabase(string2, (CatalogDatabase) new CatalogDatabaseImpl(Maps.newHashMap(), (String) null), true);
            } catch (DatabaseAlreadyExistException e) {
                throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.", string, string2);
            }
        }
        if (!createCatalog.tableExists(objectPath)) {
            try {
                createCatalog.createIcebergTable(objectPath, resolvedCatalogTable, true);
            } catch (TableAlreadyExistException e2) {
                throw new AlreadyExistsException(e2, "Table %s already exists in the database %s and catalog %s", string3, string2, string);
            }
        }
        return TableLoader.fromCatalog(createCatalog.getCatalogLoader(), TableIdentifier.of(string2, string3));
    }

    private static TableLoader createTableLoader(FlinkCatalog flinkCatalog, ObjectPath objectPath) {
        Preconditions.checkNotNull(flinkCatalog, "Flink catalog cannot be null");
        return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(), flinkCatalog.toIdentifier(objectPath));
    }
}
