/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.redis.connector;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.connector.redis.config.FlinkJedisConfigBase;
import net.wicp.tams.common.flink.connector.redis.container.RedisCommandsContainer;
import net.wicp.tams.common.flink.connector.redis.container.RedisCommandsContainerBuilder;
import net.wicp.tams.common.flink.connector.redis.mapper.LookupRedisMapper;
import net.wicp.tams.common.flink.connector.redis.mapper.RedisCommand;
import net.wicp.tams.common.flink.connector.redis.mapper.RedisCommandDescription;
import net.wicp.tams.common.flink.connector.redis.options.RedisLookupOptions;
import net.wicp.tams.common.flink.connector.redis.options.RedisSourceOptions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class RedisRowDataLookupFunction
extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(RedisRowDataLookupFunction.class);
    private static final long serialVersionUID = 1L;
    private String additionalKey;
    private LookupRedisMapper lookupRedisMapper;
    private RedisCommand redisCommand;
    protected final RedisLookupOptions redisLookupOptions;
    private FlinkJedisConfigBase flinkJedisConfigBase;
    private RedisCommandsContainer redisCommandsContainer;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final boolean isBatchMode;
    private final int batchSize;
    private final int batchMinTriggerDelayMs;
    private transient Cache<Object, RowData> cache;
    private transient Consumer<Object[]> evaler;
    private final DataType physicalDataType;
    private List<RowType.RowField> fields = null;
    private final Configuration optionsWith;
    private String[] keyNames;

    public RedisRowDataLookupFunction(FlinkJedisConfigBase flinkJedisConfigBase, LookupRedisMapper lookupRedisMapper, RedisLookupOptions redisLookupOptions, DataType physicalDataType, Configuration optionsWith, String[] keyNames) {
        this.physicalDataType = physicalDataType;
        this.flinkJedisConfigBase = flinkJedisConfigBase;
        this.lookupRedisMapper = lookupRedisMapper;
        this.redisLookupOptions = redisLookupOptions;
        RedisCommandDescription redisCommandDescription = lookupRedisMapper.getCommandDescription();
        this.redisCommand = redisCommandDescription.getRedisCommand();
        this.additionalKey = redisCommandDescription.getAdditionalKey();
        this.cacheMaxSize = this.redisLookupOptions.getCacheMaxSize();
        this.cacheExpireMs = this.redisLookupOptions.getCacheExpireMs();
        this.maxRetryTimes = this.redisLookupOptions.getMaxRetryTimes();
        this.isBatchMode = this.redisLookupOptions.isBatchMode();
        this.batchSize = this.redisLookupOptions.getBatchSize();
        this.batchMinTriggerDelayMs = this.redisLookupOptions.getBatchMinTriggerDelayMs();
        this.optionsWith = optionsWith;
        this.keyNames = keyNames;
    }

    public void eval(Object ... objects) throws IOException {
        for (int retry = 0; retry <= this.maxRetryTimes; ++retry) {
            try {
                this.evaler.accept(objects);
                break;
            }
            catch (Exception e) {
                LOG.error(String.format("redis lookup error, retry times = %d", retry), (Throwable)e);
                if (retry >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of Redis lookup failed.", e);
                }
                try {
                    Thread.sleep(1000 * retry);
                    continue;
                }
                catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }

    public void open(FunctionContext context) {
        LOG.info("start open ...");
        RedisSourceOptions.packageOptions((ReadableConfig)this.optionsWith);
        LOG.info("redislookupconnector\u914d\u7f6e\u4fe1\u606f===>" + this.redisLookupOptions.toString());
        try {
            this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.redisLookupOptions);
            this.redisCommandsContainer.open();
        }
        catch (Exception e) {
            LOG.error("Redis has not been properly initialized: ", (Throwable)e);
            throw new RuntimeException(e);
        }
        Cache<Object, RowData> cache = this.cache = this.cacheMaxSize <= 0L || this.cacheExpireMs <= 0L ? null : CacheBuilder.newBuilder().recordStats().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
        if (this.cache != null) {
            context.getMetricGroup().gauge("lookupCacheHitRate", () -> this.cache.stats().hitRate());
            this.evaler = in -> {
                HashMap<String, String> kv = new HashMap<String, String>();
                String searchkeysuffix = this.handlekeysuffix((Object[])in, (Map<String, String>)kv);
                RowData cacheRowData = (RowData)this.cache.getIfPresent((Object)searchkeysuffix);
                if (cacheRowData != null) {
                    this.collect(cacheRowData);
                } else {
                    switch (this.redisCommand) {
                        case HGET: {
                            this.getAndCache(searchkeysuffix, kv);
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Cannot process such data type: " + (Object)((Object)this.redisCommand));
                        }
                    }
                }
            };
        } else {
            this.evaler = in -> {
                HashMap<String, String> kv = new HashMap<String, String>();
                String searchkeysuffix = this.handlekeysuffix((Object[])in, (Map<String, String>)kv);
                switch (this.redisCommand) {
                    case HGET: {
                        this.getAndCache(searchkeysuffix, kv);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Cannot process such data type: " + (Object)((Object)this.redisCommand));
                    }
                }
            };
        }
        LOG.info("end open.");
    }

    private String handlekeysuffix(Object[] in, Map<String, String> kv) {
        if (this.keyNames.length == 1) {
            kv.put(this.keyNames[0], String.valueOf(in[0]));
            return String.valueOf(in[0]);
        }
        if (this.keyNames.length == 2) {
            kv.put(this.keyNames[0], String.valueOf(in[0]));
            kv.put(this.keyNames[1], String.valueOf(in[1]));
            StringJoiner stringJoiner = new StringJoiner(":");
            if (this.keyNames[0].equals(this.optionsWith.get(RedisSourceOptions.routeColName))) {
                return stringJoiner.add(String.valueOf(in[0])).add(String.valueOf(in[1])).toString();
            }
            if (this.keyNames[1].equals(this.optionsWith.get(RedisSourceOptions.routeColName))) {
                return stringJoiner.add(String.valueOf(in[1])).add(String.valueOf(in[0])).toString();
            }
            throw new RuntimeException("\u8def\u7531\u53c2\u6570\u4e0d\u5728\u8868\u5b57\u6bb5\u5b9a\u4e49\u8303\u56f4\u5185\uff0c\u6216\u5173\u8054\u6761\u4ef6\u4e2d\u4e0d\u5305\u542b\u8def\u7531\u53c2\u6570\uff01");
        }
        throw new RuntimeException("\u8fd0\u884c\u65f6\u63a5\u53d7\u5230\u4e86\u4e0d\u5408\u6cd5\u7684\u53c2\u6570\uff0c\u5f53\u524d\u6700\u591a\u652f\u6301\u5305\u542b\u8def\u7531\u53c2\u6570\u5185\u7684\u4e24\u4e2a\u53c2\u6570\uff01");
    }

    private void getAndCache(String searchkeysuffix, Map<String, String> kv) {
        RowType rowType = (RowType)this.physicalDataType.getLogicalType();
        this.fields = rowType.getFields();
        Map<String, String> allvalue = this.redisCommandsContainer.hgetAll(this.redisLookupOptions.getSearchkeyprefix().concat(":").concat(searchkeysuffix));
        for (String key : kv.keySet()) {
            allvalue.put(key, kv.get(key));
        }
        GenericRowData row = new GenericRowData(this.fields.size());
        row.setRowKind(RowKind.INSERT);
        this.packRow(allvalue, row);
        this.collect(row);
        if (this.cache != null && null != row) {
            this.cache.put((Object)searchkeysuffix, (Object)row);
        }
    }

    private void packRow(Map<String, String> allValue, GenericRowData rowData) {
        for (int i = 0; i < this.fields.size(); ++i) {
            RowType.RowField rowField = this.fields.get(i);
            FlinkTypeEnum flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField.getType().getTypeRoot().toString());
            Object value = allValue.containsKey(rowField.getName()) ? FlinkTypeEnum.getValue((FlinkTypeEnum)flinkTypeEnum, (String)allValue.get(rowField.getName()), (LogicalType)rowField.getType()) : null;
            rowData.setField(i, value);
        }
    }

    public void close() {
        LOG.info("start close ...");
        if (this.redisCommandsContainer != null) {
            try {
                this.redisCommandsContainer.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        LOG.info("end close.");
    }
}

