/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.connectors.pulsar.PulsarSchemaValidator;
import org.apache.flink.streaming.connectors.pulsar.PulsarTableSink;
import org.apache.flink.streaming.connectors.pulsar.PulsarTableSource;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.PulsarValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarTableSourceSinkFactory
implements StreamTableSourceFactory<Row>,
StreamTableSinkFactory<Row> {
    private static final Logger log = LoggerFactory.getLogger(PulsarTableSourceSinkFactory.class);
    private Properties catalogProperties;
    private boolean isInPulsarCatalog;
    private boolean isInDDL;

    public PulsarTableSourceSinkFactory(Properties catalogProperties) {
        this.catalogProperties = catalogProperties;
        this.isInPulsarCatalog = catalogProperties.size() != 0;
        this.isInDDL = false;
    }

    public PulsarTableSourceSinkFactory() {
        this(new Properties());
    }

    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
        Properties sinkProp;
        DescriptorProperties dp = this.getValidatedProperties(properties);
        TableSchema schema = dp.getTableSchema("schema");
        String topic = dp.getString("connector.topic");
        String serviceUrl = dp.getString("connector.service-url");
        String adminUrl = dp.getString("connector.admin-url");
        String formatType = null;
        formatType = this.isInPulsarCatalog ? this.catalogProperties.getProperty("format.type") : dp.getString("format.type");
        Optional proctime = SchemaValidator.deriveProctimeAttribute((DescriptorProperties)dp);
        List rowtimeAttributeDescriptors = SchemaValidator.deriveRowtimeAttributes((DescriptorProperties)dp);
        if (this.isInPulsarCatalog) {
            sinkProp = new Properties();
            sinkProp.putAll((Map<?, ?>)this.catalogProperties);
        } else {
            sinkProp = this.getPulsarProperties(dp);
        }
        sinkProp.put("connector.topic", topic);
        sinkProp.put("format.type", formatType);
        Properties result = PulsarTableSourceSinkFactory.removeConnectorPrefix(sinkProp);
        SerializationSchema<Row> serializationSchema = this.getSerializationSchema(properties);
        log.info("stream table sink use {} to serialize data", serializationSchema);
        return new PulsarTableSink(serviceUrl, adminUrl, schema, topic, result, serializationSchema);
    }

    public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
        HashMap<String, String> result = new HashMap<String, String>(context.getTable().toProperties());
        if (!result.containsKey("connector.topic")) {
            String topic = PulsarCatalogSupport.objectPath2TopicName(context.getObjectIdentifier().toObjectPath());
            result.put("connector.topic", topic);
        }
        return this.createStreamTableSink(result);
    }

    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        Properties sourceProp;
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        String topic = descriptorProperties.getString("connector.topic");
        String serviceUrl = descriptorProperties.getString("connector.service-url");
        String adminUrl = descriptorProperties.getString("connector.admin-url");
        StartupOptions startupOptions = this.getStartupOptions(descriptorProperties);
        Optional<TableSchema> schema = Optional.empty();
        if (this.isInPulsarCatalog || this.isInDDL) {
            schema = Optional.of(descriptorProperties.getTableSchema("schema"));
        }
        if (this.isInPulsarCatalog) {
            sourceProp = new Properties();
            sourceProp.putAll((Map<?, ?>)this.catalogProperties);
        } else {
            sourceProp = this.getPulsarProperties(descriptorProperties);
        }
        boolean useExtendField = descriptorProperties.getOptionalBoolean("connector.use-extend-field").orElse(false);
        sourceProp.put("connector.use-extend-field", useExtendField ? "true" : "false");
        sourceProp.put("connector.topic", topic);
        Properties result = PulsarTableSourceSinkFactory.removeConnectorPrefix(sourceProp);
        DeserializationSchema<Row> deserializationSchema = this.getDeserializationSchema(properties);
        Optional<Map<String, String>> fieldMapping = Optional.ofNullable(deserializationSchema).map(ResultTypeQueryable::getProducedType).map(type -> SchemaValidator.deriveFieldMapping((DescriptorProperties)descriptorProperties, Optional.of(type)));
        log.info("stream table source use {} to deserialize data", deserializationSchema);
        return new PulsarTableSource(schema, SchemaValidator.deriveProctimeAttribute((DescriptorProperties)descriptorProperties), SchemaValidator.deriveRowtimeAttributes((DescriptorProperties)descriptorProperties), fieldMapping, serviceUrl, adminUrl, result, deserializationSchema, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.externalSubscriptionName);
    }

    public TableSource<Row> createTableSource(ObjectPath tablePath, CatalogTable table) {
        HashMap<String, String> props = new HashMap<String, String>();
        props.putAll(table.toProperties());
        boolean bl = this.isInDDL = props.size() != 0;
        if (props.get("connector.topic") == null) {
            String topic = PulsarCatalogSupport.objectPath2TopicName(tablePath);
            props.put("connector.topic", topic);
        }
        return this.createStreamTableSource(props);
    }

    private static Properties removeConnectorPrefix(Properties in) {
        String connectorPrefix = "connector.";
        Properties out = new Properties();
        for (Map.Entry<Object, Object> kv : in.entrySet()) {
            String k = (String)kv.getKey();
            String v = (String)kv.getValue();
            if (k.startsWith(connectorPrefix)) {
                out.put(k.substring(connectorPrefix.length()), v);
                continue;
            }
            out.put(k, v);
        }
        return out;
    }

    public Map<String, String> requiredContext() {
        HashMap<String, String> context = new HashMap<String, String>();
        context.put("update-mode", "append");
        context.put("connector.type", "pulsar");
        context.put("connector.property-version", "1");
        return context;
    }

    public List<String> supportedProperties() {
        ArrayList<String> properties = new ArrayList<String>();
        properties.add("update-mode");
        properties.add("connector.version");
        properties.add("connector.topic");
        properties.add("connector.service-url");
        properties.add("connector.admin-url");
        properties.add("connector.startup-mode");
        properties.add("connector.specific-offsets.#.partition");
        properties.add("connector.specific-offsets.#.offset");
        properties.add("connector.properties.*");
        properties.add("connector.sub-name");
        properties.add("connector.use-extend-field");
        properties.add("connector.properties");
        properties.add("connector.properties.#.key");
        properties.add("connector.properties.#.value");
        properties.add("connector.sink-extractor");
        properties.add("connector.sink-extractor-class");
        properties.add("schema.#.name");
        properties.add("schema.#.from");
        properties.add("schema.#.data-type");
        properties.add("schema.#.expr");
        properties.add("schema.#.proctime");
        properties.add("schema.#.rowtime.timestamps.type");
        properties.add("schema.#.rowtime.timestamps.from");
        properties.add("schema.#.rowtime.timestamps.class");
        properties.add("schema.#.rowtime.timestamps.serialized");
        properties.add("schema.#.rowtime.watermarks.type");
        properties.add("schema.#.rowtime.watermarks.class");
        properties.add("schema.#.rowtime.watermarks.serialized");
        properties.add("schema.#.rowtime.watermarks.delay");
        properties.add("schema.#.expr");
        properties.add("schema.watermark.#.rowtime");
        properties.add("schema.watermark.#.strategy.expr");
        properties.add("schema.watermark.#.strategy.data-type");
        properties.add("format.*");
        return properties;
    }

    private StartupOptions getStartupOptions(DescriptorProperties descriptorProperties) {
        HashMap specificOffsets = new HashMap();
        ArrayList subName = new ArrayList(1);
        StartupMode startupMode = descriptorProperties.getOptionalString("connector.startup-mode").map(modeString -> {
            switch (modeString) {
                case "earliest": {
                    return StartupMode.EARLIEST;
                }
                case "latest": {
                    return StartupMode.LATEST;
                }
                case "specific-offsets": {
                    List offsetList = descriptorProperties.getFixedIndexedProperties("connector.specific-offsets", Arrays.asList("partition", "offset"));
                    offsetList.forEach(kv -> {
                        String partition = descriptorProperties.getString((String)kv.get("partition"));
                        String offset = descriptorProperties.getString((String)kv.get("offset"));
                        try {
                            specificOffsets.put(partition, MessageId.fromByteArray((byte[])offset.getBytes()));
                        }
                        catch (IOException e) {
                            log.error("Failed to decode message id from properties {}", (Object)ExceptionUtils.stringifyException((Throwable)e));
                            throw new RuntimeException(e);
                        }
                    });
                    return StartupMode.SPECIFIC_OFFSETS;
                }
                case "external-subscription": {
                    subName.add(descriptorProperties.getString("connector.sub-name"));
                    return StartupMode.EXTERNAL_SUBSCRIPTION;
                }
            }
            throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }).orElse(StartupMode.LATEST);
        StartupOptions options = new StartupOptions();
        options.startupMode = startupMode;
        options.specificOffsets = specificOffsets;
        if (subName.size() != 0) {
            options.externalSubscriptionName = (String)subName.get(0);
        }
        return options;
    }

    private Properties getPulsarProperties(DescriptorProperties descriptorProperties) {
        Properties pulsarProperties = new Properties();
        String magicKey = "connector.properties.0.key";
        if (!descriptorProperties.containsKey("connector.properties.0.key")) {
            descriptorProperties.asMap().keySet().stream().filter(key -> key.startsWith("connector.properties")).forEach(key -> {
                String value = descriptorProperties.getString(key);
                String subKey = key.substring("connector.properties.".length());
                pulsarProperties.put(subKey, value);
            });
        } else {
            List propsList = descriptorProperties.getFixedIndexedProperties("connector.properties", Arrays.asList("key", "value"));
            propsList.forEach(kv -> pulsarProperties.put(descriptorProperties.getString((String)kv.get("key")), descriptorProperties.getString((String)kv.get("value"))));
        }
        return pulsarProperties;
    }

    private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
        Map fieldMapping = SchemaValidator.deriveFieldMapping((DescriptorProperties)descriptorProperties, Optional.of(schema.toRowType()));
        return fieldMapping.size() != schema.getFieldNames().length || !fieldMapping.entrySet().stream().allMatch(mapping -> ((String)mapping.getKey()).equals(mapping.getValue()));
    }

    private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(properties);
        new PulsarSchemaValidator(true, true, false).validate(descriptorProperties);
        new PulsarValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) {
        try {
            DeserializationSchemaFactory formatFactory = (DeserializationSchemaFactory)TableFactoryService.find(DeserializationSchemaFactory.class, properties, (ClassLoader)this.getClass().getClassLoader());
            return formatFactory.createDeserializationSchema(properties);
        }
        catch (Exception e) {
            log.warn("get deserializer from properties failed. using pulsar inner schema instead.");
            return null;
        }
    }

    private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
        try {
            SerializationSchemaFactory formatFactory = (SerializationSchemaFactory)TableFactoryService.find(SerializationSchemaFactory.class, properties, (ClassLoader)this.getClass().getClassLoader());
            return formatFactory.createSerializationSchema(properties);
        }
        catch (Exception e) {
            log.warn("get deserializer from properties failed. using json schema instead.");
            return JsonSer.of(Row.class);
        }
    }

    private static class StartupOptions {
        private StartupMode startupMode;
        private Map<String, MessageId> specificOffsets;
        private String externalSubscriptionName;

        private StartupOptions() {
        }
    }
}

