/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.table.DynamicPulsarDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarDynamicTableSource
implements ScanTableSource,
SupportsReadingMetadata,
SupportsWatermarkPushDown {
    private static final Logger log = LoggerFactory.getLogger(PulsarDynamicTableSource.class);
    protected DataType producedDataType;
    protected List<String> metadataKeys;
    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy;
    private static final String VALUE_METADATA_PREFIX = "value.";
    protected final DataType physicalDataType;
    @Nullable
    protected final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
    protected final int[] keyProjection;
    protected final int[] valueProjection;
    @Nullable
    protected final String keyPrefix;
    protected final List<String> topics;
    protected final String topicPattern;
    protected final String serviceUrl;
    protected final String adminUrl;
    protected final Properties properties;
    protected final PulsarTableOptions.StartupOptions startupOptions;
    private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
    protected final boolean upsertMode;

    public PulsarDynamicTableSource(DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, List<String> topics, String topicPattern, String serviceUrl, String adminUrl, Properties properties, PulsarTableOptions.StartupOptions startupOptions, boolean upsertMode) {
        this.producedDataType = physicalDataType;
        this.setTopicInfo(properties, topics, topicPattern);
        this.physicalDataType = (DataType)Preconditions.checkNotNull((Object)physicalDataType, (String)"Physical data type must not be null.");
        this.keyDecodingFormat = keyDecodingFormat;
        this.valueDecodingFormat = (DecodingFormat)Preconditions.checkNotNull(valueDecodingFormat, (String)"Value decoding format must not be null.");
        this.keyProjection = (int[])Preconditions.checkNotNull((Object)keyProjection, (String)"Key projection must not be null.");
        this.valueProjection = (int[])Preconditions.checkNotNull((Object)valueProjection, (String)"Value projection must not be null.");
        this.keyPrefix = keyPrefix;
        this.producedDataType = physicalDataType;
        this.metadataKeys = new ArrayList<String>();
        this.watermarkStrategy = null;
        Preconditions.checkArgument((topics != null && topicPattern == null || topics == null && topicPattern != null ? 1 : 0) != 0, (Object)"Either Topic or Topic Pattern must be set for source.");
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.adminUrl = adminUrl;
        this.serviceUrl = serviceUrl;
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties, (String)"Properties must not be null.");
        this.startupOptions = startupOptions;
        this.upsertMode = upsertMode;
    }

    private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
        if (StringUtils.isNotBlank((CharSequence)topicPattern)) {
            properties.putIfAbsent("topicspattern", topicPattern);
            properties.remove("topic");
            properties.remove("topics");
        } else if (topics != null && topics.size() > 1) {
            properties.putIfAbsent("topics", StringUtils.join(topics, (String)","));
            properties.remove("topicspattern");
            properties.remove("topic");
        } else if (topics != null && topics.size() == 1) {
            properties.putIfAbsent("topic", StringUtils.join(topics, (String)","));
            properties.remove("topicspattern");
            properties.remove("topics");
        } else {
            throw new IllegalStateException("Use `topics` instead of `topic` for multi topic read");
        }
    }

    public ChangelogMode getChangelogMode() {
        return this.valueDecodingFormat.getChangelogMode();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext context) {
        DeserializationSchema<RowData> keyDeserialization = this.createDeserialization((DynamicTableSource.Context)context, this.keyDecodingFormat, this.keyProjection, this.keyPrefix);
        DeserializationSchema<RowData> valueDeserialization = this.createDeserialization((DynamicTableSource.Context)context, this.valueDecodingFormat, this.valueProjection, "");
        TypeInformation producedTypeInfo = context.createTypeInformation(this.producedDataType);
        PulsarDeserializationSchema<RowData> deserializationSchema = this.createPulsarDeserialization(keyDeserialization, valueDeserialization, (TypeInformation<RowData>)producedTypeInfo);
        ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(this.serviceUrl, this.properties);
        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<RowData>(this.adminUrl, clientConfigurationData, deserializationSchema, this.properties);
        if (this.watermarkStrategy != null) {
            source.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        switch (this.startupOptions.startupMode) {
            case EARLIEST: {
                source.setStartFromEarliest();
                break;
            }
            case LATEST: {
                source.setStartFromLatest();
                break;
            }
            case SPECIFIC_OFFSETS: {
                source.setStartFromSpecificOffsets(this.startupOptions.specificOffsets);
                break;
            }
            case TIMESTAMP: {
                source.setStartFromTimestamp(this.startupOptions.startupTimestampMills);
                break;
            }
            case EXTERNAL_SUBSCRIPTION: {
                MessageId subscriptionPosition = MessageId.latest;
                if ("earliest".equals(this.startupOptions.externalSubStartOffset)) {
                    subscriptionPosition = MessageId.earliest;
                }
                source.setStartFromSubscription(this.startupOptions.externalSubscriptionName, subscriptionPosition);
            }
        }
        return SourceFunctionProvider.of(source, (boolean)false);
    }

    private PulsarDeserializationSchema<RowData> createPulsarDeserialization(DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo) {
        DynamicPulsarDeserializationSchema.MetadataConverter[] metadataConverters = (DynamicPulsarDeserializationSchema.MetadataConverter[])this.metadataKeys.stream().map(k -> Stream.of(ReadableMetadata.values()).filter(rm -> rm.key.equals(k)).findFirst().orElseThrow(IllegalStateException::new)).map(m -> m.converter).toArray(DynamicPulsarDeserializationSchema.MetadataConverter[]::new);
        boolean hasMetadata = this.metadataKeys.size() > 0;
        int adjustedPhysicalArity = this.producedDataType.getChildren().size() - this.metadataKeys.size();
        int[] adjustedValueProjection = IntStream.concat(IntStream.of(this.valueProjection), IntStream.range(this.keyProjection.length + this.valueProjection.length, adjustedPhysicalArity)).toArray();
        return new DynamicPulsarDeserializationSchema(adjustedPhysicalArity, keyDeserialization, this.keyProjection, valueDeserialization, adjustedValueProjection, hasMetadata, metadataConverters, producedTypeInfo, this.upsertMode);
    }

    public DynamicTableSource copy() {
        PulsarDynamicTableSource copy = new PulsarDynamicTableSource(this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topics, this.topicPattern, this.serviceUrl, this.adminUrl, this.properties, this.startupOptions, false);
        copy.producedDataType = this.producedDataType;
        copy.metadataKeys = this.metadataKeys;
        copy.watermarkStrategy = this.watermarkStrategy;
        return copy;
    }

    public String asSummaryString() {
        return "Pulsar universal table source";
    }

    private static ClientConfigurationData newClientConf(String serviceUrl) {
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        return clientConf;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof PulsarDynamicTableSource)) {
            return false;
        }
        PulsarDynamicTableSource that = (PulsarDynamicTableSource)o;
        return this.upsertMode == that.upsertMode && Objects.equals(this.producedDataType, that.producedDataType) && Objects.equals(this.metadataKeys, that.metadataKeys) && Objects.equals(this.watermarkStrategy, that.watermarkStrategy) && Objects.equals(this.physicalDataType, that.physicalDataType) && Objects.equals(this.keyDecodingFormat, that.keyDecodingFormat) && Objects.equals(this.valueDecodingFormat, that.valueDecodingFormat) && Arrays.equals(this.keyProjection, that.keyProjection) && Arrays.equals(this.valueProjection, that.valueProjection) && Objects.equals(this.keyPrefix, that.keyPrefix) && Objects.equals(this.topics, that.topics) && Objects.equals(this.topicPattern, that.topicPattern) && Objects.equals(this.serviceUrl, that.serviceUrl) && Objects.equals(this.adminUrl, that.adminUrl) && Objects.equals(new HashMap<Object, Object>(this.properties), new HashMap<Object, Object>(that.properties)) && Objects.equals(this.startupOptions, that.startupOptions);
    }

    public int hashCode() {
        int result = Objects.hash(this.producedDataType, this.metadataKeys, this.watermarkStrategy, this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyPrefix, this.topics, this.topicPattern, this.serviceUrl, this.adminUrl, this.properties, this.startupOptions, this.upsertMode);
        result = 31 * result + Arrays.hashCode(this.keyProjection);
        result = 31 * result + Arrays.hashCode(this.valueProjection);
        return result;
    }

    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        this.valueDecodingFormat.listReadableMetadata().forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, (DataType)value));
        Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
        return metadataMap;
    }

    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        List formatMetadataKeys = metadataKeys.stream().filter(k -> k.startsWith(VALUE_METADATA_PREFIX)).collect(Collectors.toList());
        ArrayList<String> connectorMetadataKeys = new ArrayList<String>(metadataKeys);
        connectorMetadataKeys.removeAll(formatMetadataKeys);
        Map formatMetadata = this.valueDecodingFormat.listReadableMetadata();
        if (formatMetadata.size() > 0) {
            List requestedFormatMetadataKeys = formatMetadataKeys.stream().map(k -> k.substring(VALUE_METADATA_PREFIX.length())).collect(Collectors.toList());
            this.valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
        }
        this.metadataKeys = connectorMetadataKeys;
        this.producedDataType = producedDataType;
    }

    @Nullable
    private DeserializationSchema<RowData> createDeserialization(DynamicTableSource.Context context, @Nullable DecodingFormat<DeserializationSchema<RowData>> format, int[] projection, @Nullable String prefix) {
        if (format == null) {
            return null;
        }
        DataType physicalFormatDataType = DataTypeUtils.projectRow((DataType)this.physicalDataType, (int[])projection);
        if (prefix != null) {
            physicalFormatDataType = DataTypeUtils.stripRowPrefix((DataType)physicalFormatDataType, (String)prefix);
        }
        return (DeserializationSchema)format.createRuntimeDecoder(context, physicalFormatDataType);
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    static enum ReadableMetadata {
        TOPIC("topic", (DataType)DataTypes.STRING().notNull(), message -> StringData.fromString((String)message.getTopicName())),
        MESSAGE_ID("messageId", (DataType)DataTypes.BYTES().notNull(), message -> message.getMessageId().toByteArray()),
        SEQUENCE_ID("sequenceId", (DataType)DataTypes.BIGINT().notNull(), Message::getSequenceId),
        PUBLISH_TIME("publishTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).notNull(), message -> TimestampData.fromEpochMillis((long)message.getPublishTime())),
        EVENT_TIME("eventTime", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).notNull(), message -> TimestampData.fromEpochMillis((long)message.getEventTime())),
        PROPERTIES("properties", (DataType)DataTypes.MAP((DataType)((DataType)DataTypes.STRING().nullable()), (DataType)((DataType)DataTypes.STRING().nullable())).notNull(), message -> {
            HashMap<StringData, StringData> map = new HashMap<StringData, StringData>();
            for (Map.Entry e : message.getProperties().entrySet()) {
                map.put(StringData.fromString((String)((String)e.getKey())), StringData.fromString((String)((String)e.getValue())));
            }
            return new GenericMapData(map);
        });

        final String key;
        final DataType dataType;
        final DynamicPulsarDeserializationSchema.MetadataConverter converter;

        private ReadableMetadata(String key, DataType dataType, DynamicPulsarDeserializationSchema.MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

