/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.catalog.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.connectors.pulsar.catalog.util.TableSchemaHelper;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaTranslator;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;

public class PulsarCatalogSupport {
    private static final String TABLE_PREFIX = "table_";
    private final PulsarMetadataReader pulsarMetadataReader;
    private final String flinkCatalogTenant;
    private SchemaTranslator schemaTranslator;

    public PulsarCatalogSupport(String adminUrl, ClientConfigurationData clientConf, String flinkTenant, SchemaTranslator schemaTranslator) throws PulsarClientException, PulsarAdminException {
        this.pulsarMetadataReader = new PulsarMetadataReader(adminUrl, clientConf, "", new HashMap<String, String>(), -1, -1);
        this.schemaTranslator = schemaTranslator;
        this.flinkCatalogTenant = flinkTenant;
        if (!this.pulsarMetadataReader.tenantExists(this.flinkCatalogTenant)) {
            this.pulsarMetadataReader.createTenant(this.flinkCatalogTenant);
        }
    }

    @VisibleForTesting
    public PulsarCatalogSupport(PulsarMetadataReader metadataReader, SchemaTranslator schemaTranslator, String flinkCatalogTenant) {
        this.pulsarMetadataReader = metadataReader;
        this.schemaTranslator = schemaTranslator;
        this.flinkCatalogTenant = flinkCatalogTenant;
    }

    private boolean isGenericDatabase(String name) {
        return !name.contains("/");
    }

    private String completeGenericDatabasePath(String name) {
        return this.flinkCatalogTenant + "/" + name;
    }

    public List<String> listDatabases() throws PulsarAdminException {
        ArrayList<String> databases = new ArrayList<String>();
        for (String ns : this.pulsarMetadataReader.listNamespaces()) {
            if (ns.startsWith(this.flinkCatalogTenant)) {
                databases.add(ns.substring(this.flinkCatalogTenant.length() + 1));
                continue;
            }
            databases.add(ns);
        }
        return databases;
    }

    public boolean databaseExists(String name) throws PulsarAdminException {
        if (this.isGenericDatabase(name)) {
            return this.pulsarMetadataReader.namespaceExists(this.completeGenericDatabasePath(name));
        }
        return this.pulsarMetadataReader.namespaceExists(name);
    }

    public void createDatabase(String name) throws PulsarAdminException {
        if (!this.isGenericDatabase(name)) {
            throw new CatalogException("Can't create pulsar tenant/namespace mapped database");
        }
        this.pulsarMetadataReader.createNamespace(this.completeGenericDatabasePath(name));
    }

    public void dropDatabase(String name) throws PulsarAdminException {
        if (!this.isGenericDatabase(name)) {
            throw new CatalogException("Can't drop pulsar tenant/namespace mapped database");
        }
        this.pulsarMetadataReader.deleteNamespace(this.completeGenericDatabasePath(name));
    }

    public List<String> listTables(String name) throws PulsarAdminException {
        if (this.isGenericDatabase(name)) {
            ArrayList<String> tables = new ArrayList<String>();
            List<String> topics = this.pulsarMetadataReader.getTopics(this.completeGenericDatabasePath(name));
            for (String topic : topics) {
                tables.add(topic.substring(TABLE_PREFIX.length()));
            }
            return tables;
        }
        return this.pulsarMetadataReader.getTopics(name);
    }

    public boolean tableExists(ObjectPath tablePath) throws PulsarAdminException {
        String topicName = this.objectNameToTopicName(tablePath);
        return this.pulsarMetadataReader.topicExists(topicName);
    }

    public CatalogTable getTable(ObjectPath tablePath) throws PulsarAdminException {
        String topicName = this.objectNameToTopicName(tablePath);
        if (this.isGenericDatabase(tablePath.getDatabaseName())) {
            try {
                SchemaInfo metadataSchema = this.pulsarMetadataReader.getPulsarSchema(topicName);
                Map<String, String> tableProperties = TableSchemaHelper.generateTableProperties(metadataSchema);
                CatalogTable table = CatalogTable.fromProperties(tableProperties);
                table.getOptions().put("generic", Boolean.TRUE.toString());
                return CatalogTable.of((Schema)table.getUnresolvedSchema(), (String)table.getComment(), (List)table.getPartitionKeys(), this.enrichTableOptions(table.getOptions()));
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new CatalogException("Failed to fetch metadata for generic table: " + tablePath.getObjectName());
            }
        }
        SchemaInfo pulsarSchema = this.pulsarMetadataReader.getPulsarSchema(topicName);
        return this.schemaToCatalogTable(pulsarSchema);
    }

    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws PulsarAdminException {
        if (!this.isGenericDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException("Can't delete normal pulsar topic");
        }
        String topicName = this.objectNameToTopicName(tablePath);
        this.pulsarMetadataReader.deleteSchema(topicName);
        this.pulsarMetadataReader.deleteTopic(topicName);
    }

    public void createTable(ObjectPath tablePath, ResolvedCatalogTable table) throws PulsarAdminException {
        if (!this.isGenericDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException(String.format("Can't create generic table under pulsar tenant/namespace: %s", tablePath.getDatabaseName()));
        }
        String topicName = this.objectNameToTopicName(tablePath);
        this.pulsarMetadataReader.createTopic(topicName, 0);
        try {
            SchemaInfo schemaInfo = TableSchemaHelper.generateSchemaInfo(table.toProperties());
            this.pulsarMetadataReader.uploadSchema(topicName, schemaInfo);
        }
        catch (Exception e) {
            try {
                this.pulsarMetadataReader.deleteTopic(topicName);
            }
            catch (PulsarAdminException pulsarAdminException) {
                // empty catch block
            }
            e.printStackTrace();
            throw new CatalogException("Can't store table metadata");
        }
    }

    private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema) {
        TableSchema tableSchema = this.schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema);
        Schema schema = Schema.newBuilder().fromRowDataType(tableSchema.toRowDataType()).build();
        return CatalogTable.of((Schema)schema, (String)"", Collections.emptyList(), this.enrichTableOptions(null));
    }

    private Map<String, String> enrichTableOptions(Map<String, String> tableOptions) {
        String authParams;
        HashMap<String, String> enrichedTableOptions = new HashMap<String, String>();
        enrichedTableOptions.put(FactoryUtil.CONNECTOR.key(), "pulsar");
        enrichedTableOptions.put("admin-url", this.pulsarMetadataReader.getAdminUrl());
        enrichedTableOptions.put("service-url", this.pulsarMetadataReader.getClientConf().getServiceUrl());
        String authPlugin = this.pulsarMetadataReader.getClientConf().getAuthPluginClassName();
        if (authPlugin != null && !authPlugin.isEmpty()) {
            enrichedTableOptions.put("properties.auth-plugin-classname", authPlugin);
        }
        if ((authParams = this.pulsarMetadataReader.getClientConf().getAuthParams()) != null && !authParams.isEmpty()) {
            enrichedTableOptions.put("properties.auth-params", authParams);
        }
        if (tableOptions != null) {
            enrichedTableOptions.putAll(tableOptions);
        }
        if (!enrichedTableOptions.containsKey(PulsarTableOptions.VALUE_FORMAT.key()) && !enrichedTableOptions.containsKey(FactoryUtil.FORMAT.key())) {
            enrichedTableOptions.put(FactoryUtil.FORMAT.key(), "raw");
        }
        return enrichedTableOptions;
    }

    private String objectNameToTopicName(ObjectPath objectPath) {
        String topic;
        String database;
        if (this.isGenericDatabase(objectPath.getDatabaseName())) {
            database = this.flinkCatalogTenant + "/" + objectPath.getDatabaseName();
            topic = TABLE_PREFIX + objectPath.getObjectName();
        } else {
            database = objectPath.getDatabaseName();
            topic = objectPath.getObjectName();
        }
        NamespaceName ns = NamespaceName.get((String)database);
        TopicName fullName = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)ns, (String)topic);
        return fullName.toString();
    }

    public void close() {
        if (this.pulsarMetadataReader != null) {
            this.pulsarMetadataReader.close();
        }
    }

    public void deleteNamespace(String name) throws PulsarAdminException {
        this.pulsarMetadataReader.deleteNamespace(name);
    }
}

