/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.kafka.connector;

import com.alibaba.fastjson.JSONObject;
import java.util.List;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.connector.kafka.KafkaOptions;
import net.wicp.tams.common.kafka.KafkaAssitInst;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaRichSinkFunction
extends RichSinkFunction<RowData> {
    private static final long serialVersionUID = 1L;
    private final int[] keyColIndex;
    private final int[] partitionColIndex;
    private final List<RowType.RowField> fields;
    private final SerializationSchema<RowData> serialization;
    private final Configuration optionsWith;
    private final String topic;
    private final int partitions;

    public KafkaRichSinkFunction(ResolvedSchema schema, SerializationSchema<RowData> serialization, List<String> partitionKeys, Configuration optionsWith) {
        if (schema.getPrimaryKey().isPresent()) {
            List keys = ((UniqueConstraint)schema.getPrimaryKey().get()).getColumns();
            this.keyColIndex = new int[keys.size()];
            int index = 0;
            for (int i = 0; i < schema.getColumns().size(); ++i) {
                if (!keys.contains(((Column)schema.getColumns().get(i)).getName())) continue;
                this.keyColIndex[index++] = i;
            }
        } else {
            this.keyColIndex = new int[0];
        }
        if (CollectionUtils.isNotEmpty(partitionKeys)) {
            this.partitionColIndex = new int[partitionKeys.size()];
            int index = 0;
            for (int i = 0; i < schema.getColumns().size(); ++i) {
                if (!partitionKeys.contains(((Column)schema.getColumns().get(i)).getName())) continue;
                this.partitionColIndex[index++] = i;
            }
        } else {
            this.partitionColIndex = new int[0];
        }
        RowType rowType = (RowType)schema.toSinkRowDataType().getLogicalType();
        this.fields = rowType.getFields();
        this.serialization = serialization;
        this.optionsWith = optionsWith;
        this.topic = optionsWith.getString(KafkaOptions.topic);
        KafkaProducer kafkaProducer = KafkaAssitInst.getInst().getKafkaProducer(byte[].class);
        List partiList = kafkaProducer.partitionsFor(this.topic);
        this.partitions = partiList.size();
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        KafkaOptions.packageOptionsSink((ReadableConfig)this.optionsWith);
    }

    public void invoke(RowData value, SinkFunction.Context context) throws Exception {
        RowKind rowKind = value.getRowKind();
        if (rowKind == RowKind.UPDATE_BEFORE) {
            return;
        }
        byte[] serialize = this.serialization.serialize((Object)value);
        JSONObject keyjson = new JSONObject();
        for (int keyIndex : this.keyColIndex) {
            FlinkTypeEnum flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)this.fields.get(keyIndex).getType().getTypeRoot().toString());
            Object rowDataValue = flinkTypeEnum.getRowDataValue(value, keyIndex, this.fields.get(keyIndex).getType());
            keyjson.put(this.fields.get(keyIndex).getName(), rowDataValue);
        }
        KafkaProducer kafkaProducer = KafkaAssitInst.getInst().getKafkaProducer(byte[].class);
        ProducerRecord message = null;
        if (this.partitions == 1 || ArrayUtils.isEmpty((int[])this.partitionColIndex) && ArrayUtils.isEmpty((int[])this.keyColIndex)) {
            message = new ProducerRecord(this.topic, (Object)keyjson.toJSONString(), (Object)serialize);
        } else {
            StringBuffer buff = new StringBuffer();
            if (ArrayUtils.isNotEmpty((int[])this.partitionColIndex)) {
                for (int partitionKeyIndex : this.partitionColIndex) {
                    buff.append(FlinkTypeEnum.getStr((RowType.RowField)this.fields.get(partitionKeyIndex), (RowData)value, (int)partitionKeyIndex) + "`");
                }
            } else {
                for (int keyIndex : this.keyColIndex) {
                    buff.append(FlinkTypeEnum.getStr((RowType.RowField)this.fields.get(keyIndex), (RowData)value, (int)keyIndex) + "`");
                }
            }
            message = new ProducerRecord(this.topic, Integer.valueOf(StringUtil.partition((String)buff.toString(), (int)this.partitions)), (Object)keyjson.toJSONString(), (Object)serialize);
        }
        kafkaProducer.send(message);
    }

    public void close() throws Exception {
    }
}

