/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.PulsarRowDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.SupportsReadingMetadata;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaTranslator;
import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarTableSource
implements StreamTableSource<Row>,
DefinedProctimeAttribute,
DefinedRowtimeAttributes,
DefinedFieldMapping,
SupportsReadingMetadata {
    private static final Logger log = LoggerFactory.getLogger(PulsarTableSource.class);
    private final String serviceUrl;
    private final String adminUrl;
    private final StartupMode startupMode;
    private final Map<String, MessageId> specificStartupOffsets;
    private final String externalSubscriptionName;
    private final Map<String, String> caseInsensitiveParams;
    private final Optional<TableSchema> providedSchema;
    private final Optional<String> proctimeAttribute;
    private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    private final Properties properties;
    private final TableSchema schema;
    private final Optional<Map<String, String>> fieldMapping;
    private final DeserializationSchema<Row> deserializationSchema;
    private final SchemaTranslator schemaTranslator;
    protected List<String> metadataKeys;
    protected final boolean useExtendField;
    protected DataType producedDataType;
    protected final DataType physicalDataType;
    protected TypeInformation<Row> producedTypeInfo;

    public PulsarTableSource(Optional<TableSchema> providedSchema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, Optional<Map<String, String>> fieldMapping, String serviceUrl, String adminUrl, Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<String, MessageId> specificStartupOffsets, String externalSubscriptionName) {
        this.providedSchema = providedSchema;
        this.serviceUrl = (String)Preconditions.checkNotNull((Object)serviceUrl);
        this.adminUrl = (String)Preconditions.checkNotNull((Object)adminUrl);
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties);
        this.startupMode = startupMode;
        this.specificStartupOffsets = specificStartupOffsets;
        this.externalSubscriptionName = externalSubscriptionName;
        this.caseInsensitiveParams = SourceSinkUtils.validateStreamSourceOptions((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.schemaTranslator = new SimpleSchemaTranslator();
        this.schema = this.inferTableSchema();
        this.proctimeAttribute = this.validateProctimeAttribute(proctimeAttribute);
        this.rowtimeAttributeDescriptors = this.validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
        this.fieldMapping = fieldMapping;
        this.deserializationSchema = deserializationSchema;
        this.metadataKeys = Collections.emptyList();
        this.physicalDataType = this.schema.toRowDataType();
        this.useExtendField = Boolean.parseBoolean(properties.getProperty("use-extend-field", "false"));
    }

    public PulsarTableSource(String serviceUrl, String adminUrl, Properties properties) {
        this(Optional.empty(), Optional.empty(), Collections.emptyList(), Optional.empty(), serviceUrl, adminUrl, properties, null, StartupMode.LATEST, Collections.emptyMap(), null);
    }

    public String getProctimeAttribute() {
        return this.proctimeAttribute.orElse(null);
    }

    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        return this.rowtimeAttributeDescriptors;
    }

    public TypeInformation<Row> getReturnType() {
        if (this.useExtendField) {
            return this.producedTypeInfo;
        }
        return this.deserializationSchema.getProducedType();
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public Map<String, String> getFieldMapping() {
        return this.fieldMapping.orElse(null);
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        PulsarRowDeserializationSchema.ReadableRowMetadataConverter[] metadataConverters;
        if (this.useExtendField) {
            this.metadataKeys = Arrays.stream(ReadableMetadata.values()).map(x -> x.key).collect(Collectors.toList());
            this.applyReadableMetadata(this.metadataKeys, this.generateProducedDataType());
            this.producedTypeInfo = TypeConversions.fromDataTypeToLegacyInfo((DataType)this.producedDataType);
        }
        PulsarRowDeserializationSchema pulsarDeserializationSchema = new PulsarRowDeserializationSchema(this.deserializationSchema, (metadataConverters = (PulsarRowDeserializationSchema.ReadableRowMetadataConverter[])this.metadataKeys.stream().map(k -> Stream.of(ReadableMetadata.values()).filter(rm -> rm.key.equals(k)).findFirst().orElseThrow(IllegalStateException::new)).map(m -> m.converter).toArray(PulsarRowDeserializationSchema.ReadableRowMetadataConverter[]::new)).length > 0, metadataConverters, this.producedTypeInfo);
        FlinkPulsarSource<Row> source = new FlinkPulsarSource<Row>(this.serviceUrl, this.adminUrl, pulsarDeserializationSchema, this.properties);
        switch (this.startupMode) {
            case EARLIEST: {
                source.setStartFromEarliest();
                break;
            }
            case LATEST: {
                source.setStartFromLatest();
                break;
            }
            case SPECIFIC_OFFSETS: {
                source.setStartFromSpecificOffsets(this.specificStartupOffsets);
                break;
            }
            case EXTERNAL_SUBSCRIPTION: {
                MessageId subscriptionPosition = MessageId.latest;
                if ("earliest".equals(this.properties.get("connector.sub-default-offset"))) {
                    subscriptionPosition = MessageId.earliest;
                }
                source.setStartFromSubscription(this.externalSubscriptionName, subscriptionPosition);
            }
        }
        return execEnv.addSource(source).name(this.explainSource());
    }

    private TableSchema inferTableSchema() {
        if (this.providedSchema.isPresent()) {
            return this.providedSchema.get();
        }
        try {
            PulsarMetadataReader reader = new PulsarMetadataReader(this.adminUrl, new ClientConfigurationData(), "", this.caseInsensitiveParams, -1, -1);
            List<String> topics = reader.getTopics().stream().map(TopicRange::getTopic).collect(Collectors.toList());
            SchemaInfo pulsarSchema = reader.getPulsarSchema(topics);
            return this.schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema);
        }
        catch (IncompatibleSchemaException | PulsarAdminException | PulsarClientException e) {
            log.error("Failed to fetch table schema", (Object)this.adminUrl);
            throw new RuntimeException(e);
        }
    }

    private Optional<String> validateProctimeAttribute(Optional<String> proctimeAttribute) {
        return proctimeAttribute.map(attribute -> {
            Optional tpe = this.schema.getFieldType(attribute);
            if (!tpe.isPresent()) {
                throw new ValidationException("Processing time attribute '" + attribute + "' is not present in TableSchema.");
            }
            if (tpe.get() != Types.SQL_TIMESTAMP()) {
                throw new ValidationException("Processing time attribute '" + attribute + "' is not of type SQL_TIMESTAMP.");
            }
            return attribute;
        });
    }

    private List<RowtimeAttributeDescriptor> validateRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
        Preconditions.checkNotNull(rowtimeAttributeDescriptors, (String)"List of rowtime attributes must not be null.");
        for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
            String rowtimeAttribute = desc.getAttributeName();
            Optional tpe = this.schema.getFieldType(rowtimeAttribute);
            if (!tpe.isPresent()) {
                throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not present in TableSchema.");
            }
            if (tpe.get() == Types.SQL_TIMESTAMP()) continue;
            throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not of type SQL_TIMESTAMP.");
        }
        return rowtimeAttributeDescriptors;
    }

    public PulsarDeserializationSchema<Row> getDeserializationSchema() {
        if (this.deserializationSchema == null) {
            throw new RuntimeException("in table mode, deserializationSchema is needed.");
        }
        return new PulsarDeserializationSchemaWrapper<Row>(this.deserializationSchema);
    }

    @Override
    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
        return metadataMap;
    }

    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        this.metadataKeys = metadataKeys;
        this.producedDataType = producedDataType;
    }

    private DataType generateProducedDataType() {
        FieldsDataType fieldsDataType;
        ArrayList<DataTypes.Field> mainSchema = new ArrayList<DataTypes.Field>();
        if (this.physicalDataType instanceof FieldsDataType) {
            fieldsDataType = (FieldsDataType)this.physicalDataType;
            RowType rowType = (RowType)fieldsDataType.getLogicalType();
            List fieldNames = rowType.getFieldNames();
            for (int i = 0; i < fieldNames.size(); ++i) {
                LogicalType logicalType = rowType.getTypeAt(i);
                DataTypes.Field field = DataTypes.FIELD((String)((String)fieldNames.get(i)), (DataType)TypeConversions.fromLogicalToDataType((LogicalType)logicalType));
                mainSchema.add(field);
            }
        } else {
            mainSchema.add(DataTypes.FIELD((String)"value", (DataType)this.physicalDataType));
        }
        if (this.useExtendField) {
            mainSchema.addAll(SimpleSchemaTranslator.METADATA_FIELDS);
        }
        fieldsDataType = (FieldsDataType)DataTypes.ROW((DataTypes.Field[])mainSchema.toArray(new DataTypes.Field[0]));
        return fieldsDataType;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof PulsarTableSource)) {
            return false;
        }
        PulsarTableSource other = (PulsarTableSource)o;
        if (!other.canEqual(this)) {
            return false;
        }
        if (this.useExtendField != other.useExtendField) {
            return false;
        }
        String this$serviceUrl = this.serviceUrl;
        String other$serviceUrl = other.serviceUrl;
        if (this$serviceUrl == null ? other$serviceUrl != null : !this$serviceUrl.equals(other$serviceUrl)) {
            return false;
        }
        String this$adminUrl = this.adminUrl;
        String other$adminUrl = other.adminUrl;
        if (this$adminUrl == null ? other$adminUrl != null : !this$adminUrl.equals(other$adminUrl)) {
            return false;
        }
        StartupMode this$startupMode = this.startupMode;
        StartupMode other$startupMode = other.startupMode;
        if (this$startupMode == null ? other$startupMode != null : !((Object)((Object)this$startupMode)).equals((Object)other$startupMode)) {
            return false;
        }
        Map<String, MessageId> this$specificStartupOffsets = this.specificStartupOffsets;
        Map<String, MessageId> other$specificStartupOffsets = other.specificStartupOffsets;
        if (this$specificStartupOffsets == null ? other$specificStartupOffsets != null : !((Object)this$specificStartupOffsets).equals(other$specificStartupOffsets)) {
            return false;
        }
        String this$externalSubscriptionName = this.externalSubscriptionName;
        String other$externalSubscriptionName = other.externalSubscriptionName;
        if (this$externalSubscriptionName == null ? other$externalSubscriptionName != null : !this$externalSubscriptionName.equals(other$externalSubscriptionName)) {
            return false;
        }
        Map<String, String> this$caseInsensitiveParams = this.caseInsensitiveParams;
        Map<String, String> other$caseInsensitiveParams = other.caseInsensitiveParams;
        if (this$caseInsensitiveParams == null ? other$caseInsensitiveParams != null : !((Object)this$caseInsensitiveParams).equals(other$caseInsensitiveParams)) {
            return false;
        }
        Optional<TableSchema> this$providedSchema = this.providedSchema;
        Optional<TableSchema> other$providedSchema = other.providedSchema;
        if (this$providedSchema == null ? other$providedSchema != null : !((Object)this$providedSchema).equals(other$providedSchema)) {
            return false;
        }
        String this$proctimeAttribute = this.getProctimeAttribute();
        String other$proctimeAttribute = other.getProctimeAttribute();
        if (this$proctimeAttribute == null ? other$proctimeAttribute != null : !this$proctimeAttribute.equals(other$proctimeAttribute)) {
            return false;
        }
        List<RowtimeAttributeDescriptor> this$rowtimeAttributeDescriptors = this.getRowtimeAttributeDescriptors();
        List<RowtimeAttributeDescriptor> other$rowtimeAttributeDescriptors = other.getRowtimeAttributeDescriptors();
        if (this$rowtimeAttributeDescriptors == null ? other$rowtimeAttributeDescriptors != null : !((Object)this$rowtimeAttributeDescriptors).equals(other$rowtimeAttributeDescriptors)) {
            return false;
        }
        Properties this$properties = this.properties;
        Properties other$properties = other.properties;
        if (this$properties == null ? other$properties != null : !((Object)this$properties).equals(other$properties)) {
            return false;
        }
        TableSchema this$schema = this.schema;
        TableSchema other$schema = other.schema;
        if (this$schema == null ? other$schema != null : !this$schema.equals(other$schema)) {
            return false;
        }
        Map<String, String> this$fieldMapping = this.getFieldMapping();
        Map<String, String> other$fieldMapping = other.getFieldMapping();
        if (this$fieldMapping == null ? other$fieldMapping != null : !((Object)this$fieldMapping).equals(other$fieldMapping)) {
            return false;
        }
        PulsarDeserializationSchema<Row> this$deserializationSchema = this.getDeserializationSchema();
        PulsarDeserializationSchema<Row> other$deserializationSchema = other.getDeserializationSchema();
        if (this$deserializationSchema == null ? other$deserializationSchema != null : !this$deserializationSchema.equals(other$deserializationSchema)) {
            return false;
        }
        SchemaTranslator this$schemaTranslator = this.schemaTranslator;
        SchemaTranslator other$schemaTranslator = other.schemaTranslator;
        if (this$schemaTranslator == null ? other$schemaTranslator != null : !this$schemaTranslator.equals(other$schemaTranslator)) {
            return false;
        }
        List<String> this$metadataKeys = this.metadataKeys;
        List<String> other$metadataKeys = other.metadataKeys;
        if (this$metadataKeys == null ? other$metadataKeys != null : !((Object)this$metadataKeys).equals(other$metadataKeys)) {
            return false;
        }
        DataType this$producedDataType = this.producedDataType;
        DataType other$producedDataType = other.producedDataType;
        if (this$producedDataType == null ? other$producedDataType != null : !this$producedDataType.equals(other$producedDataType)) {
            return false;
        }
        DataType this$physicalDataType = this.physicalDataType;
        DataType other$physicalDataType = other.physicalDataType;
        if (this$physicalDataType == null ? other$physicalDataType != null : !this$physicalDataType.equals(other$physicalDataType)) {
            return false;
        }
        TypeInformation<Row> this$producedTypeInfo = this.producedTypeInfo;
        TypeInformation<Row> other$producedTypeInfo = other.producedTypeInfo;
        return !(this$producedTypeInfo == null ? other$producedTypeInfo != null : !this$producedTypeInfo.equals(other$producedTypeInfo));
    }

    protected boolean canEqual(Object other) {
        return other instanceof PulsarTableSource;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        result = result * 59 + (this.useExtendField ? 79 : 97);
        String $serviceUrl = this.serviceUrl;
        result = result * 59 + ($serviceUrl == null ? 43 : $serviceUrl.hashCode());
        String $adminUrl = this.adminUrl;
        result = result * 59 + ($adminUrl == null ? 43 : $adminUrl.hashCode());
        StartupMode $startupMode = this.startupMode;
        result = result * 59 + ($startupMode == null ? 43 : ((Object)((Object)$startupMode)).hashCode());
        Map<String, MessageId> $specificStartupOffsets = this.specificStartupOffsets;
        result = result * 59 + ($specificStartupOffsets == null ? 43 : ((Object)$specificStartupOffsets).hashCode());
        String $externalSubscriptionName = this.externalSubscriptionName;
        result = result * 59 + ($externalSubscriptionName == null ? 43 : $externalSubscriptionName.hashCode());
        Map<String, String> $caseInsensitiveParams = this.caseInsensitiveParams;
        result = result * 59 + ($caseInsensitiveParams == null ? 43 : ((Object)$caseInsensitiveParams).hashCode());
        Optional<TableSchema> $providedSchema = this.providedSchema;
        result = result * 59 + ($providedSchema == null ? 43 : ((Object)$providedSchema).hashCode());
        String $proctimeAttribute = this.getProctimeAttribute();
        result = result * 59 + ($proctimeAttribute == null ? 43 : $proctimeAttribute.hashCode());
        List<RowtimeAttributeDescriptor> $rowtimeAttributeDescriptors = this.getRowtimeAttributeDescriptors();
        result = result * 59 + ($rowtimeAttributeDescriptors == null ? 43 : ((Object)$rowtimeAttributeDescriptors).hashCode());
        Properties $properties = this.properties;
        result = result * 59 + ($properties == null ? 43 : ((Object)$properties).hashCode());
        TableSchema $schema = this.schema;
        result = result * 59 + ($schema == null ? 43 : $schema.hashCode());
        Map<String, String> $fieldMapping = this.getFieldMapping();
        result = result * 59 + ($fieldMapping == null ? 43 : ((Object)$fieldMapping).hashCode());
        PulsarDeserializationSchema<Row> $deserializationSchema = this.getDeserializationSchema();
        result = result * 59 + ($deserializationSchema == null ? 43 : $deserializationSchema.hashCode());
        SchemaTranslator $schemaTranslator = this.schemaTranslator;
        result = result * 59 + ($schemaTranslator == null ? 43 : $schemaTranslator.hashCode());
        List<String> $metadataKeys = this.metadataKeys;
        result = result * 59 + ($metadataKeys == null ? 43 : ((Object)$metadataKeys).hashCode());
        DataType $producedDataType = this.producedDataType;
        result = result * 59 + ($producedDataType == null ? 43 : $producedDataType.hashCode());
        DataType $physicalDataType = this.physicalDataType;
        result = result * 59 + ($physicalDataType == null ? 43 : $physicalDataType.hashCode());
        TypeInformation<Row> $producedTypeInfo = this.producedTypeInfo;
        result = result * 59 + ($producedTypeInfo == null ? 43 : $producedTypeInfo.hashCode());
        return result;
    }

    public static enum ReadableMetadata {
        KEY_ATTRIBUTE("__key", (DataType)DataTypes.BYTES().nullable(), record -> record.getKeyBytes()),
        TOPIC_ATTRIBUTE("__topic", (DataType)DataTypes.STRING().notNull(), record -> record.getTopicName()),
        MESSAGE_ID("__messageId", (DataType)DataTypes.BYTES().nullable(), record -> record.getMessageId().toByteArray()),
        PUBLISH_TIME("__publishTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).nullable(), record -> LocalDateTime.ofInstant(Instant.ofEpochMilli(record.getPublishTime()), ZoneId.systemDefault())),
        EVENT_TIME("__eventTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).nullable(), record -> LocalDateTime.ofInstant(Instant.ofEpochMilli(record.getEventTime()), ZoneId.systemDefault()));

        public final String key;
        public final DataType dataType;
        public final PulsarRowDeserializationSchema.ReadableRowMetadataConverter converter;

        private ReadableMetadata(String key, DataType dataType, PulsarRowDeserializationSchema.ReadableRowMetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

