/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.redis.connector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.CollectionUtil;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.connector.redis.options.RedisSourceOptions;
import net.wicp.tams.common.redis.RedisAssit;
import net.wicp.tams.common.redis.pool.AbsPool;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

public class RedisRichSinkFunction
extends RichSinkFunction<RowData> {
    private static final Logger log = LoggerFactory.getLogger(RedisRichSinkFunction.class);
    private static final long serialVersionUID = 1L;
    private final int[] keyColIndex;
    private final List<RowType.RowField> fields;
    private final SerializationSchema<RowData> serialization;
    private final Configuration optionsWith;
    private AbsPool standalone;

    public RedisRichSinkFunction(ResolvedSchema schema, SerializationSchema<RowData> serialization, Configuration optionsWith) {
        if (schema.getPrimaryKey().isPresent()) {
            List keys = ((UniqueConstraint)schema.getPrimaryKey().get()).getColumns();
            this.keyColIndex = new int[keys.size()];
            int index = 0;
            for (int i = 0; i < schema.getColumns().size(); ++i) {
                if (!keys.contains(((Column)schema.getColumns().get(i)).getName())) continue;
                this.keyColIndex[index++] = i;
            }
        } else {
            this.keyColIndex = new int[0];
        }
        RowType rowType = (RowType)schema.toSinkRowDataType().getLogicalType();
        this.fields = rowType.getFields();
        this.serialization = serialization;
        this.optionsWith = optionsWith;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RedisSourceOptions.packageOptions((ReadableConfig)this.optionsWith);
        String serverName = (String)this.optionsWith.get(RedisSourceOptions.groupid);
        this.standalone = RedisAssit.standalone((String)serverName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invoke(RowData value, SinkFunction.Context context) throws Exception {
        RedisSourceOptions.packageOptions((ReadableConfig)this.optionsWith);
        String serverName = (String)this.optionsWith.get(RedisSourceOptions.groupid);
        String streamkey = Conf.get((String)String.format("common.redis.redisserver.%s.streamkey", serverName));
        String groupkey = Conf.get((String)String.format("common.redis.redisserver.%s.groupkey", serverName));
        if (this.standalone == null || !this.standalone.isInit()) {
            this.standalone = RedisAssit.standalone((String)serverName);
        }
        Jedis jedis = null;
        try {
            jedis = this.standalone.getResource();
            ArrayList<String> keyValues = new ArrayList<String>();
            for (int keyIndex : this.keyColIndex) {
                String valueOf = FlinkTypeEnum.getStr((RowType.RowField)this.fields.get(keyIndex), (RowData)value, (int)keyIndex);
                keyValues.add(valueOf);
            }
            String rowKeyValue = CollectionUtil.listJoin(keyValues, (String)"`");
            String key = String.format("%s:%s", this.optionsWith.get(RedisSourceOptions.searchkeyprefix), rowKeyValue);
            HashMap<String, String> data = new HashMap<String, String>();
            for (int i = 0; i < this.fields.size(); ++i) {
                String colValue = FlinkTypeEnum.getStr((RowType.RowField)this.fields.get(i), (RowData)value, (int)i);
                data.put(this.fields.get(i).getName(), colValue);
            }
            jedis.hmset(key, data);
            RowKind rowKind = value.getRowKind();
            switch (rowKind) {
                case INSERT: {
                    jedis.sadd(groupkey, new String[]{key});
                    break;
                }
                case DELETE: {
                    jedis.del(key);
                    jedis.srem(groupkey, new String[]{key});
                    break;
                }
            }
            byte[] serialize = this.serialization.serialize((Object)value);
            HashMap<byte[], byte[]> content = new HashMap<byte[], byte[]>();
            content.put("data".getBytes(), serialize);
            content.put("time".getBytes(), String.valueOf(System.currentTimeMillis()).getBytes());
            this.standalone.putStreamByte(jedis, streamkey, content);
        }
        catch (Exception e) {
            log.error("sink redis\u5931\u8d25", (Throwable)e);
        }
        finally {
            this.standalone.returnResource(jedis);
        }
    }

    public void close() throws Exception {
        if (this.standalone != null || this.standalone.isInit()) {
            this.standalone.destroy();
        }
    }
}

