/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.pulsar.table.PulsarDynamicTableSink;
import org.apache.flink.connector.pulsar.table.PulsarDynamicTableSource;
import org.apache.flink.connector.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;

public class PulsarDynamicTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static final String IDENTIFIER = "pulsar";

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig tableOptions = helper.getOptions();
        List topics = (List)tableOptions.get(PulsarTableOptions.TOPIC);
        String adminUrl = (String)tableOptions.get(PulsarTableOptions.ADMIN_URL);
        String serverUrl = (String)tableOptions.get(PulsarTableOptions.SERVICE_URL);
        EncodingFormat encodingFormat = helper.discoverEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT);
        helper.validateExcept(new String[]{"properties."});
        PulsarTableOptions.validateTableSinkOptions(tableOptions);
        Properties properties = PulsarDynamicTableFactory.removeConnectorPrefix(context.getCatalogTable().toProperties());
        DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        return new PulsarDynamicTableSink(serverUrl, adminUrl, (String)topics.get(0), physicalDataType, properties, (EncodingFormat<SerializationSchema<RowData>>)encodingFormat);
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig tableOptions = helper.getOptions();
        List topics = (List)tableOptions.get(PulsarTableOptions.TOPIC);
        String topicPattern = (String)tableOptions.get(PulsarTableOptions.TOPIC_PATTERN);
        String adminUrl = (String)tableOptions.get(PulsarTableOptions.ADMIN_URL);
        String serviceUrl = (String)tableOptions.get(PulsarTableOptions.SERVICE_URL);
        DecodingFormat decodingFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT);
        helper.validateExcept(new String[]{"properties."});
        PulsarTableOptions.validateTableSourceOptions(tableOptions);
        Properties properties = PulsarDynamicTableFactory.removeConnectorPrefix(context.getCatalogTable().toProperties());
        DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        PulsarTableOptions.StartupOptions startupOptions = PulsarTableOptions.getStartupOptions(tableOptions, topics);
        return new PulsarDynamicTableSource(physicalDataType, (DecodingFormat<DeserializationSchema<RowData>>)decodingFormat, topics, topicPattern, serviceUrl, adminUrl, properties, startupOptions);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(FactoryUtil.FORMAT);
        options.add(PulsarTableOptions.SERVICE_URL);
        options.add(PulsarTableOptions.ADMIN_URL);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(PulsarTableOptions.TOPIC);
        options.add(PulsarTableOptions.TOPIC_PATTERN);
        options.add(PulsarTableOptions.SCAN_STARTUP_MODE);
        options.add(PulsarTableOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
        options.add(PulsarTableOptions.SCAN_STARTUP_SUB_NAME);
        options.add(PulsarTableOptions.PULSAR_READER_READER_NAME);
        options.add(PulsarTableOptions.PULSAR_READER_SUBSCRIPTION_ROLE_PREFIX);
        options.add(PulsarTableOptions.PULSAR_READER_RECEIVER_QUEUE_SIZE);
        options.add(PulsarTableOptions.PARTITION_DISCOVERY_INTERVAL_MILLIS);
        return options;
    }

    private static Properties removeConnectorPrefix(Map<String, String> in) {
        String connectorPrefix = "connector.";
        Properties out = new Properties();
        for (Map.Entry<String, String> kv : in.entrySet()) {
            String k = kv.getKey();
            String v = kv.getValue();
            if (k.startsWith(connectorPrefix)) {
                out.put(k.substring(connectorPrefix.length()), v);
                continue;
            }
            out.put(k, v);
        }
        return out;
    }
}

