/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table.catalog.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.table.PulsarTableOptions;
import org.apache.flink.connector.pulsar.table.catalog.PulsarCatalogConfiguration;
import org.apache.flink.connector.pulsar.table.catalog.impl.PulsarAdminTool;
import org.apache.flink.connector.pulsar.table.catalog.impl.SchemaTranslator;
import org.apache.flink.connector.pulsar.table.catalog.utils.TableSchemaHelper;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;

public class PulsarCatalogSupport {
    private static final String DATABASE_COMMENT_KEY = "__database_comment";
    private static final String DATABASE_DESCRIPTION_KEY = "__database_description";
    private static final String DATABASE_DETAILED_DESCRIPTION_KEY = "__database_detailed_description";
    private static final String TABLE_PREFIX = "table_";
    PulsarCatalogConfiguration catalogConfiguration;
    private final PulsarAdminTool pulsarAdminTool;
    private final String flinkCatalogTenant;
    private SchemaTranslator schemaTranslator;

    public PulsarCatalogSupport(PulsarCatalogConfiguration catalogConfiguration, String flinkTenant, SchemaTranslator schemaTranslator) throws PulsarAdminException {
        this.catalogConfiguration = catalogConfiguration;
        this.pulsarAdminTool = new PulsarAdminTool(catalogConfiguration);
        this.schemaTranslator = schemaTranslator;
        this.flinkCatalogTenant = flinkTenant;
        if (!this.pulsarAdminTool.tenantExists(this.flinkCatalogTenant)) {
            this.pulsarAdminTool.createTenant(this.flinkCatalogTenant);
        }
    }

    private boolean isExplicitDatabase(String name) {
        return !name.contains("/");
    }

    private String completeExplicitDatabasePath(String name) {
        return this.flinkCatalogTenant + "/" + name;
    }

    public List<String> listDatabases() throws PulsarAdminException {
        ArrayList<String> databases = new ArrayList<String>();
        for (String ns : this.pulsarAdminTool.listNamespaces()) {
            if (ns.startsWith(this.flinkCatalogTenant)) {
                databases.add(ns.substring(this.flinkCatalogTenant.length() + 1));
                continue;
            }
            databases.add(ns);
        }
        return databases;
    }

    public boolean databaseExists(String name) throws PulsarAdminException {
        if (this.isExplicitDatabase(name)) {
            return this.pulsarAdminTool.namespaceExists(this.completeExplicitDatabasePath(name));
        }
        return this.pulsarAdminTool.namespaceExists(name);
    }

    public void createDatabase(String name, CatalogDatabase database) throws PulsarAdminException {
        if (!this.isExplicitDatabase(name)) {
            throw new CatalogException("Can't create pulsar tenant/namespace mapped database");
        }
        this.pulsarAdminTool.createNamespace(this.completeExplicitDatabasePath(name));
        Map allProperties = database.getProperties();
        allProperties.put(DATABASE_COMMENT_KEY, database.getComment());
        allProperties.put(DATABASE_DESCRIPTION_KEY, database.getDescription().orElse(""));
        allProperties.put(DATABASE_DETAILED_DESCRIPTION_KEY, database.getDetailedDescription().orElse(""));
        this.pulsarAdminTool.updateNamespaceProperties(this.completeExplicitDatabasePath(name), allProperties);
    }

    public CatalogDatabase getDatabase(String name) throws PulsarAdminException {
        Map<String, String> allProperties = this.pulsarAdminTool.getNamespaceProperties(this.completeExplicitDatabasePath(name));
        String comment = allProperties.getOrDefault(DATABASE_COMMENT_KEY, "");
        allProperties.remove(DATABASE_COMMENT_KEY);
        return new CatalogDatabaseImpl(allProperties, comment);
    }

    public void dropDatabase(String name) throws PulsarAdminException {
        if (!this.isExplicitDatabase(name)) {
            throw new CatalogException("Can't drop pulsar tenant/namespace mapped database");
        }
        this.pulsarAdminTool.deleteNamespace(this.completeExplicitDatabasePath(name));
    }

    public List<String> listTables(String name) throws PulsarAdminException {
        if (this.isExplicitDatabase(name)) {
            ArrayList<String> tables = new ArrayList<String>();
            List<String> topics = this.pulsarAdminTool.getTopics(this.completeExplicitDatabasePath(name));
            for (String topic : topics) {
                tables.add(topic.substring(TABLE_PREFIX.length()));
            }
            return tables;
        }
        return this.pulsarAdminTool.getTopics(name);
    }

    public boolean tableExists(ObjectPath tablePath) throws PulsarAdminException {
        if (this.isExplicitDatabase(tablePath.getDatabaseName())) {
            return this.pulsarAdminTool.topicExists(this.findExplicitTablePlaceholderTopic(tablePath));
        }
        return this.pulsarAdminTool.topicExists(this.findTopicForNativeTable(tablePath));
    }

    public CatalogTable getTable(ObjectPath tablePath) throws PulsarAdminException {
        if (this.isExplicitDatabase(tablePath.getDatabaseName())) {
            try {
                String mappedTopic = this.findExplicitTablePlaceholderTopic(tablePath);
                SchemaInfo metadataSchema = this.pulsarAdminTool.getPulsarSchema(mappedTopic);
                Map<String, String> tableProperties = TableSchemaHelper.generateTableProperties(metadataSchema);
                CatalogTable table = CatalogTable.fromProperties(tableProperties);
                table.getOptions().put(PulsarTableOptions.EXPLICIT.key(), Boolean.TRUE.toString());
                return CatalogTable.of((Schema)table.getUnresolvedSchema(), (String)table.getComment(), (List)table.getPartitionKeys(), this.fillDefaultOptionsFromCatalogOptions(table.getOptions()));
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new CatalogException("Failed to fetch metadata for explict table: " + tablePath.getObjectName());
            }
        }
        String existingTopic = this.findTopicForNativeTable(tablePath);
        SchemaInfo pulsarSchema = this.pulsarAdminTool.getPulsarSchema(existingTopic);
        return this.schemaToCatalogTable(pulsarSchema, existingTopic);
    }

    public void dropTable(ObjectPath tablePath) throws PulsarAdminException {
        if (!this.isExplicitDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException("Can't delete native topic");
        }
        String mappedTopic = this.findExplicitTablePlaceholderTopic(tablePath);
        this.pulsarAdminTool.deleteSchema(mappedTopic);
        this.pulsarAdminTool.deleteTopic(mappedTopic);
    }

    public void createTable(ObjectPath tablePath, ResolvedCatalogTable table) throws PulsarAdminException {
        if (!this.isExplicitDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException(String.format("Can't create explict table under pulsar tenant/namespace: %s because it's a native database", tablePath.getDatabaseName()));
        }
        String mappedTopic = this.findExplicitTablePlaceholderTopic(tablePath);
        this.pulsarAdminTool.createTopic(mappedTopic, 1);
        try {
            SchemaInfo schemaInfo = TableSchemaHelper.generateSchemaInfo(table.toProperties());
            this.pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo);
        }
        catch (Exception e) {
            try {
                this.pulsarAdminTool.deleteTopic(mappedTopic);
            }
            catch (PulsarAdminException pulsarAdminException) {
                // empty catch block
            }
            e.printStackTrace();
            throw new CatalogException("Can't store table metadata");
        }
    }

    private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, String topicName) {
        Schema schema = this.schemaTranslator.pulsarSchemaToFlinkSchema(pulsarSchema);
        HashMap<String, String> initialTableOptions = new HashMap<String, String>();
        initialTableOptions.put(PulsarTableOptions.TOPICS.key(), topicName);
        initialTableOptions.put(FactoryUtil.FORMAT.key(), this.schemaTranslator.decideDefaultFlinkFormat(pulsarSchema));
        Map<String, String> enrichedTableOptions = this.fillDefaultOptionsFromCatalogOptions(initialTableOptions);
        return CatalogTable.of((Schema)schema, (String)"", Collections.emptyList(), enrichedTableOptions);
    }

    private Map<String, String> fillDefaultOptionsFromCatalogOptions(Map<String, String> tableOptions) {
        String authParams;
        HashMap<String, String> enrichedTableOptions = new HashMap<String, String>();
        enrichedTableOptions.put(FactoryUtil.CONNECTOR.key(), "pulsar");
        enrichedTableOptions.put(PulsarTableOptions.ADMIN_URL.key(), (String)this.catalogConfiguration.get(PulsarOptions.PULSAR_ADMIN_URL));
        enrichedTableOptions.put(PulsarTableOptions.SERVICE_URL.key(), (String)this.catalogConfiguration.get(PulsarOptions.PULSAR_SERVICE_URL));
        String authPlugin = (String)this.catalogConfiguration.get(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME);
        if (authPlugin != null && !authPlugin.isEmpty()) {
            enrichedTableOptions.put(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME.key(), authPlugin);
        }
        if ((authParams = (String)this.catalogConfiguration.get(PulsarOptions.PULSAR_AUTH_PARAMS)) != null && !authParams.isEmpty()) {
            enrichedTableOptions.put(PulsarOptions.PULSAR_AUTH_PARAMS.key(), authParams);
        }
        if (!enrichedTableOptions.containsKey(FactoryUtil.FORMAT.key())) {
            enrichedTableOptions.put(FactoryUtil.FORMAT.key(), "raw");
        }
        if (tableOptions != null) {
            enrichedTableOptions.putAll(tableOptions);
        }
        return enrichedTableOptions;
    }

    private String findExplicitTablePlaceholderTopic(ObjectPath objectPath) {
        String database = this.flinkCatalogTenant + "/" + objectPath.getDatabaseName();
        String topic = TABLE_PREFIX + objectPath.getObjectName();
        NamespaceName ns = NamespaceName.get(database);
        TopicName fullName = TopicName.get(TopicDomain.persistent.toString(), ns, topic);
        return fullName.toString();
    }

    private String findTopicForNativeTable(ObjectPath objectPath) {
        String database = objectPath.getDatabaseName();
        String topic = objectPath.getObjectName();
        NamespaceName ns = NamespaceName.get(database);
        TopicName fullName = TopicName.get(TopicDomain.persistent.toString(), ns, topic);
        return fullName.toString();
    }

    public void close() {
        if (this.pulsarAdminTool != null) {
            this.pulsarAdminTool.close();
        }
    }
}

