/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.pulsar.table.DynamicPulsarSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.SupportsWritingMetadata;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

public class PulsarDynamicTableSink
implements DynamicTableSink,
SupportsWritingMetadata {
    protected List<String> metadataKeys;
    protected DataType physicalDataType;
    protected final String topic;
    protected final String serviceUrl;
    protected final String adminUrl;
    protected final Properties properties;
    protected final boolean useExtendField;
    protected final EncodingFormat<SerializationSchema<RowData>> encodingFormat;

    protected PulsarDynamicTableSink(String serviceUrl, String adminUrl, String topic, DataType physicalDataType, Properties properties, EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
        this.serviceUrl = (String)Preconditions.checkNotNull((Object)serviceUrl, (String)"serviceUrl data type must not be null.");
        this.adminUrl = (String)Preconditions.checkNotNull((Object)adminUrl, (String)"adminUrl data type must not be null.");
        this.topic = (String)Preconditions.checkNotNull((Object)topic, (String)"Topic must not be null.");
        this.physicalDataType = (DataType)Preconditions.checkNotNull((Object)physicalDataType, (String)"Consumed data type must not be null.");
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties, (String)"Properties must not be null.");
        this.encodingFormat = (EncodingFormat)Preconditions.checkNotNull(encodingFormat, (String)"Encoding format must not be null.");
        this.metadataKeys = Collections.emptyList();
        this.useExtendField = Boolean.parseBoolean(properties.getProperty("use-extend-field", "false"));
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return this.encodingFormat.getChangelogMode();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        SerializationSchema serializationSchema = (SerializationSchema)this.encodingFormat.createRuntimeEncoder(context, this.physicalDataType);
        if (this.useExtendField) {
            this.metadataKeys = Arrays.stream(WritableMetadata.values()).map(x -> x.key).collect(Collectors.toList());
            this.applyWritableMetadata(this.metadataKeys, null);
        }
        SinkFunction<RowData> pulsarSink = this.createPulsarSink(this.topic, this.properties, (SerializationSchema<RowData>)serializationSchema);
        return SinkFunctionProvider.of(pulsarSink);
    }

    private SinkFunction<RowData> createPulsarSink(String topic, Properties properties, SerializationSchema<RowData> valueSerialization) {
        List physicalChildren = this.physicalDataType.getLogicalType().getChildren();
        RowData.FieldGetter[] physicalFieldGetters = (RowData.FieldGetter[])IntStream.range(0, physicalChildren.size()).mapToObj(pos -> RowData.createFieldGetter((LogicalType)((LogicalType)physicalChildren.get(pos)), (int)pos)).toArray(RowData.FieldGetter[]::new);
        int[] metadataPositions = Stream.of(WritableMetadata.values()).mapToInt(m -> {
            int pos = this.metadataKeys.indexOf(m.key);
            if (pos < 0) {
                return -1;
            }
            return physicalChildren.size() + pos;
        }).toArray();
        String formatName = properties.getProperty("format");
        RecordSchemaType recordSchemaType = Enum.valueOf(RecordSchemaType.class, formatName.toUpperCase());
        DynamicPulsarSerializationSchema serializationSchema = new DynamicPulsarSerializationSchema(topic, valueSerialization, this.metadataKeys.size() > 0, metadataPositions, physicalFieldGetters, recordSchemaType, this.physicalDataType);
        return new FlinkPulsarSink<RowData>(this.serviceUrl, this.adminUrl, Optional.ofNullable(topic), properties, serializationSchema);
    }

    public DynamicTableSink copy() {
        PulsarDynamicTableSink copy = new PulsarDynamicTableSink(this.serviceUrl, this.adminUrl, this.topic, this.physicalDataType, this.properties, this.encodingFormat);
        copy.metadataKeys = this.metadataKeys;
        return copy;
    }

    public String asSummaryString() {
        return "Pulsar universal table sink";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PulsarDynamicTableSink that = (PulsarDynamicTableSink)o;
        return this.useExtendField == that.useExtendField && Objects.equals(this.metadataKeys, that.metadataKeys) && Objects.equals(this.physicalDataType, that.physicalDataType) && Objects.equals(this.topic, that.topic) && Objects.equals(this.serviceUrl, that.serviceUrl) && Objects.equals(this.adminUrl, that.adminUrl) && Objects.equals(this.encodingFormat, that.encodingFormat);
    }

    public int hashCode() {
        return Objects.hash(this.metadataKeys, this.physicalDataType, this.topic, this.serviceUrl, this.adminUrl, this.properties, this.useExtendField, this.encodingFormat);
    }

    @Override
    public Map<String, DataType> listWritableMetadata() {
        LinkedHashMap<String, DataType> metadataMap = new LinkedHashMap<String, DataType>();
        Stream.of(WritableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
        return metadataMap;
    }

    @Override
    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
        this.metadataKeys = metadataKeys;
    }

    static enum WritableMetadata {
        TIMESTAMP("timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).nullable(), (row, pos) -> {
            if (row.isNullAt(pos)) {
                return null;
            }
            return row.getTimestamp(pos, 3).getMillisecond();
        });

        public final String key;
        public final DataType dataType;
        public final DynamicPulsarSerializationSchema.WritableRowDataMetadataConverter converter;

        private WritableMetadata(String key, DataType dataType, DynamicPulsarSerializationSchema.WritableRowDataMetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

