/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.table.DynamicPulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.table.PulsarTableOptions;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.SupportsReadingMetadata;
import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarDynamicTableSource
implements ScanTableSource,
SupportsReadingMetadata {
    private static final Logger log = LoggerFactory.getLogger(PulsarDynamicTableSource.class);
    protected DataType producedDataType;
    protected final DataType physicalDataType;
    protected List<String> metadataKeys;
    protected final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    protected final List<String> topics;
    protected final String topicPattern;
    protected final String serviceUrl;
    protected final boolean useExtendField;
    protected final String adminUrl;
    protected final Properties properties;
    protected final PulsarTableOptions.StartupOptions startupOptions;
    private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;

    public PulsarDynamicTableSource(DataType physicalDataType, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, List<String> topics, String topicPattern, String serviceUrl, String adminUrl, Properties properties, PulsarTableOptions.StartupOptions startupOptions) {
        this.physicalDataType = physicalDataType;
        this.decodingFormat = decodingFormat;
        this.producedDataType = physicalDataType;
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.serviceUrl = serviceUrl;
        this.adminUrl = adminUrl;
        this.setTopicInfo(properties, topics, topicPattern);
        this.properties = properties;
        this.startupOptions = startupOptions;
        this.metadataKeys = Collections.emptyList();
        this.useExtendField = Boolean.parseBoolean(properties.getProperty("use-extend-field", "false"));
    }

    private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
        if (StringUtils.isNotBlank((CharSequence)topicPattern)) {
            properties.putIfAbsent("topicspattern", topicPattern);
            properties.remove("topic");
            properties.remove("topics");
        } else if (topics != null && topics.size() > 1) {
            properties.putIfAbsent("topics", StringUtils.join(topics, (String)","));
            properties.remove("topicspattern");
            properties.remove("topic");
        } else if (topics != null && topics.size() == 1) {
            properties.putIfAbsent("topic", StringUtils.join(topics, (String)","));
            properties.remove("topicspattern");
            properties.remove("topics");
        } else {
            throw new RuntimeException("Use `topics` instead of `topic` for multi topic read");
        }
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
        DeserializationSchema deserializationSchema = (DeserializationSchema)this.decodingFormat.createRuntimeDecoder((DynamicTableSource.Context)runtimeProviderContext, this.physicalDataType);
        TypeInformation producedTypeInfo = null;
        if (this.useExtendField) {
            this.metadataKeys = Arrays.stream(ReadableMetadata.values()).map(x -> x.key).collect(Collectors.toList());
            this.applyReadableMetadata(this.metadataKeys, this.generateProducedDataType());
            producedTypeInfo = runtimeProviderContext.createTypeInformation(this.producedDataType);
        }
        DynamicPulsarDeserializationSchema.ReadableRowDataMetadataConverter[] metadataConverters = (DynamicPulsarDeserializationSchema.ReadableRowDataMetadataConverter[])this.metadataKeys.stream().map(k -> Stream.of(ReadableMetadata.values()).filter(rm -> rm.key.equals(k)).findFirst().orElseThrow(IllegalStateException::new)).map(m -> m.converter).toArray(DynamicPulsarDeserializationSchema.ReadableRowDataMetadataConverter[]::new);
        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<RowData>(this.adminUrl, PulsarDynamicTableSource.newClientConf(this.serviceUrl), new DynamicPulsarDeserializationSchema((DeserializationSchema<RowData>)deserializationSchema, this.metadataKeys.size() > 0, metadataConverters, (TypeInformation<RowData>)producedTypeInfo), this.properties);
        switch (this.startupOptions.startupMode) {
            case EARLIEST: {
                source.setStartFromEarliest();
                break;
            }
            case LATEST: {
                source.setStartFromLatest();
                break;
            }
            case SPECIFIC_OFFSETS: {
                source.setStartFromSpecificOffsets(this.startupOptions.specificOffsets);
                break;
            }
            case EXTERNAL_SUBSCRIPTION: {
                MessageId subscriptionPosition = MessageId.latest;
                if ("earliest".equals(this.properties.get("connector.sub-default-offset"))) {
                    subscriptionPosition = MessageId.earliest;
                }
                source.setStartFromSubscription(this.startupOptions.externalSubscriptionName, subscriptionPosition);
            }
        }
        return SourceFunctionProvider.of(source, (boolean)false);
    }

    private DataType generateProducedDataType() {
        FieldsDataType fieldsDataType;
        ArrayList<DataTypes.Field> mainSchema = new ArrayList<DataTypes.Field>();
        if (this.physicalDataType instanceof FieldsDataType) {
            fieldsDataType = (FieldsDataType)this.physicalDataType;
            RowType rowType = (RowType)fieldsDataType.getLogicalType();
            List fieldNames = rowType.getFieldNames();
            for (int i = 0; i < fieldNames.size(); ++i) {
                LogicalType logicalType = rowType.getTypeAt(i);
                DataTypes.Field field = DataTypes.FIELD((String)((String)fieldNames.get(i)), (DataType)TypeConversions.fromLogicalToDataType((LogicalType)logicalType));
                mainSchema.add(field);
            }
        } else {
            mainSchema.add(DataTypes.FIELD((String)"value", (DataType)this.physicalDataType));
        }
        if (this.useExtendField) {
            mainSchema.addAll(SimpleSchemaTranslator.METADATA_FIELDS);
        }
        fieldsDataType = (FieldsDataType)DataTypes.ROW((DataTypes.Field[])mainSchema.toArray(new DataTypes.Field[0]));
        return fieldsDataType;
    }

    public DynamicTableSource copy() {
        PulsarDynamicTableSource copy = new PulsarDynamicTableSource(this.producedDataType, this.decodingFormat, this.topics, this.topicPattern, this.serviceUrl, this.adminUrl, this.properties, this.startupOptions);
        copy.producedDataType = this.producedDataType;
        copy.metadataKeys = this.metadataKeys;
        return copy;
    }

    public String asSummaryString() {
        return "Pulsar universal table source";
    }

    private static ClientConfigurationData newClientConf(String serviceUrl) {
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        return clientConf;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PulsarDynamicTableSource that = (PulsarDynamicTableSource)o;
        return this.useExtendField == that.useExtendField && Objects.equals(this.producedDataType, that.producedDataType) && Objects.equals(this.physicalDataType, that.physicalDataType) && Objects.equals(this.metadataKeys, that.metadataKeys) && Objects.equals(this.decodingFormat, that.decodingFormat) && Objects.equals(this.topics, that.topics) && Objects.equals(this.topicPattern, that.topicPattern) && Objects.equals(this.serviceUrl, that.serviceUrl) && Objects.equals(this.adminUrl, that.adminUrl) && Objects.equals(this.startupOptions, that.startupOptions);
    }

    public int hashCode() {
        return Objects.hash(this.producedDataType, this.physicalDataType, this.metadataKeys, this.decodingFormat, this.topics, this.topicPattern, this.serviceUrl, this.useExtendField, this.adminUrl, this.properties, this.startupOptions);
    }

    @Override
    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
        return metadataMap;
    }

    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        this.metadataKeys = metadataKeys;
        this.producedDataType = producedDataType;
    }

    public static enum ReadableMetadata {
        KEY_ATTRIBUTE("__key", (DataType)DataTypes.BYTES().nullable(), record -> record.getKeyBytes()),
        TOPIC_ATTRIBUTE("__topic", (DataType)DataTypes.STRING().nullable(), record -> StringData.fromString((String)record.getTopicName())),
        MESSAGE_ID("__messageId", (DataType)DataTypes.BYTES().nullable(), record -> record.getMessageId().toByteArray()),
        EVENT_TIME("__eventTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).nullable(), record -> TimestampData.fromEpochMillis((long)record.getEventTime())),
        PUBLISH_TIME("__publishTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).nullable(), record -> TimestampData.fromEpochMillis((long)record.getPublishTime()));

        public final String key;
        public final DataType dataType;
        public final DynamicPulsarDeserializationSchema.ReadableRowDataMetadataConverter converter;

        private ReadableMetadata(String key, DataType dataType, DynamicPulsarDeserializationSchema.ReadableRowDataMetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

