/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.kafka.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import net.wicp.tams.common.constant.Middleware;
import net.wicp.tams.common.flink.common.format.DuckulaDecodingFormat;
import net.wicp.tams.common.flink.connector.kafka.connector.KafkaRichSinkFunction;
import org.apache.commons.collections.MapUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

public class KafkaDynamicTableSink
implements DynamicTableSink,
SupportsPartitioning,
SupportsWritingMetadata {
    private final ResolvedSchema schema;
    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private final Configuration options;
    private final List<String> partitionKeys;
    private Map<String, String> staticPartitions = new LinkedHashMap<String, String>();
    private List<String> metadataKeys = new ArrayList<String>();

    public KafkaDynamicTableSink(ResolvedSchema schema, EncodingFormat<SerializationSchema<RowData>> encodingFormat, Configuration options, List<String> partitionKeys) {
        this.schema = schema;
        this.encodingFormat = encodingFormat;
        this.options = options;
        this.partitionKeys = partitionKeys;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        SerializationSchema serializationSchema = (SerializationSchema)this.encodingFormat.createRuntimeEncoder(context, this.schema.toSinkRowDataType());
        KafkaRichSinkFunction kafkaRichSinkFunction = new KafkaRichSinkFunction(this.schema, (SerializationSchema<RowData>)serializationSchema, this.partitionKeys, this.options);
        return SinkFunctionProvider.of((SinkFunction)kafkaRichSinkFunction);
    }

    public DynamicTableSink copy() {
        return new KafkaDynamicTableSink(this.schema, this.encodingFormat, this.options, this.partitionKeys);
    }

    public String asSummaryString() {
        return Middleware.tamskafka.getDesc();
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.staticPartitions = new LinkedHashMap<String, String>();
        for (String partitionCol : this.partitionKeys) {
            if (!partition.containsKey(partitionCol)) continue;
            this.staticPartitions.put(partitionCol, partition.get(partitionCol));
        }
    }

    public Map<String, DataType> listWritableMetadata() {
        Map formateMeta = DuckulaDecodingFormat.getFormateMeta();
        HashMap<String, DataType> retmap = new HashMap<String, DataType>();
        if (MapUtils.isNotEmpty((Map)formateMeta)) {
            for (String key : formateMeta.keySet()) {
                retmap.put("value." + key, (DataType)formateMeta.get(key));
            }
        }
        return retmap;
    }

    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
        this.metadataKeys = metadataKeys;
    }
}

