/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.redis.connector;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.IntStream;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.DateUtil;
import net.wicp.tams.common.apiext.LoggerUtil;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.apiext.TimeAssist;
import net.wicp.tams.common.binlog.alone.DuckulaAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.constant.DateFormatCase;
import net.wicp.tams.common.constant.FieldFormart;
import net.wicp.tams.common.constant.JvmStatus;
import net.wicp.tams.common.constant.ods.AddColName;
import net.wicp.tams.common.constant.ods.AddColNameType;
import net.wicp.tams.common.exception.ExceptAll;
import net.wicp.tams.common.exception.IExcept;
import net.wicp.tams.common.exception.ProjectExceptionRuntime;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.connector.redis.options.RedisSourceOptions;
import net.wicp.tams.common.redis.RedisAssit;
import net.wicp.tams.common.redis.pool.AbsPool;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

public class RedisScanFunction
extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData> {
    private static final Logger log = LoggerFactory.getLogger(RedisScanFunction.class);
    private static final long serialVersionUID = 1L;
    private final Configuration optionsWith;
    private final List<RowType.RowField> rowTypeFields;
    private final List<String> keys;
    private final String routeColName;
    private final List<String> addColNames = new ArrayList<String>();
    private final boolean append;
    private AbsPool standalone;
    private transient FieldFormart fieldFormart;
    private final long beginTime;

    public RedisScanFunction(Configuration optionsWith, List<RowType.RowField> rowTypeFields, UniqueConstraint primaryKey) {
        this.optionsWith = optionsWith;
        this.rowTypeFields = rowTypeFields;
        this.keys = primaryKey == null ? null : primaryKey.getColumns();
        this.append = (Boolean)optionsWith.get(RedisSourceOptions.append);
        this.routeColName = (String)optionsWith.get(RedisSourceOptions.routeColName);
        this.fieldFormart = FieldFormart.valueOf((String)((String)optionsWith.get(RedisSourceOptions.fieldFormart)));
        this.beginTime = System.currentTimeMillis();
        List allColList = AddColName.getAllColNameTrue((FieldFormart)this.fieldFormart);
        for (RowType.RowField rowField : rowTypeFields) {
            FlinkTypeEnum flinkTypeEnum;
            if (allColList.contains(rowField.getName())) {
                this.addColNames.add(rowField.getName());
            }
            if ((flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField.getType().getTypeRoot().toString())) != null) continue;
            throw new ProjectExceptionRuntime((IExcept)ExceptAll.param_notfit, "\u5217\uff1a\u3010" + rowField.getName() + "\u3011\u4e0d\u652f\u6301\u7684\u7c7b\u578b\u3010" + rowField.getType().getTypeRoot().toString() + "\u3011");
        }
    }

    public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
        TimeZone.setDefault(TimeZone.getTimeZone("GMT+8"));
        RedisSourceOptions.packageOptions((ReadableConfig)this.optionsWith);
        String serverName = (String)this.optionsWith.get(RedisSourceOptions.groupid);
        String streamkey = Conf.get((String)String.format("common.redis.redisserver.%s.streamkey", serverName));
        String groupkey = Conf.get((String)String.format("common.redis.redisserver.%s.groupkey", serverName));
        if (this.standalone == null || !this.standalone.isInit()) {
            this.standalone = RedisAssit.standalone((String)serverName);
        }
        Jedis jedis = this.standalone.getResource();
        Set smembers = jedis.smembers(groupkey);
        if (this.fieldFormart == null) {
            this.fieldFormart = FieldFormart.valueOf((String)((String)this.optionsWith.get(RedisSourceOptions.fieldFormart)));
        }
        for (String smember : smembers) {
            String[] keyAry;
            Map allValue = jedis.hgetAll(smember);
            if (StringUtil.isNotNull((Object)this.routeColName) && !allValue.containsKey(this.routeColName)) {
                keyAry = smember.split(":");
                allValue.put(this.routeColName, keyAry[keyAry.length - 2]);
            }
            if (CollectionUtils.isNotEmpty(this.keys) && this.keys.size() == 1 && !allValue.containsKey(this.keys.get(0))) {
                keyAry = smember.split(":");
                allValue.put(this.keys.get(0), keyAry[keyAry.length - 1]);
            }
            if (CollectionUtils.isNotEmpty(this.addColNames)) {
                block11: for (String string : AddColName.values()) {
                    if (allValue.containsKey(string.getColNameTrue(this.fieldFormart))) continue;
                    switch (1.$SwitchMap$net$wicp$tams$common$constant$ods$AddColName[string.ordinal()]) {
                        case 1: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), DateFormatCase.YYYY_MM_DD_hhmmss.getInstanc().format(System.currentTimeMillis()));
                            continue block11;
                        }
                        case 2: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), DateFormatCase.YYYY_MM_DD_hhmmss.getInstanc().format(System.currentTimeMillis()));
                            continue block11;
                        }
                        case 3: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), "0");
                            continue block11;
                        }
                        case 4: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), "insert");
                            continue block11;
                        }
                        case 5: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), "");
                            continue block11;
                        }
                        case 6: {
                            allValue.put(string.getColNameTrue(this.fieldFormart), "");
                            continue block11;
                        }
                    }
                }
            }
            if (!this.append) {
                String colName = this.fieldFormart.getColName((String)this.optionsWith.get(RedisSourceOptions.updateColName));
                Date curTime = DateUtil.objToDate(allValue.get(colName));
                if (curTime == null) {
                    log.warn("\u5bf9\u6bd4\u5b57\u6bb5:[{}]\u5b58\u5728\u7a7a\u503c\uff0c\u6570\u636e\uff1a[{}]\uff0c\u4e0d\u80fd\u4fdd\u8bc1 exactly once\u8bed\u53e5", (Object)colName, (Object)allValue);
                } else if (this.beginTime <= curTime.getTime()) {
                    log.info("\u5931\u6548\uff1a====beginTime=" + this.beginTime + "    curTime=" + curTime.getTime());
                    continue;
                }
            }
            GenericRowData rowData = new GenericRowData(this.rowTypeFields.size());
            rowData.setRowKind(RowKind.INSERT);
            this.packRow(null, allValue, rowData);
            ctx.collect((Object)rowData);
        }
        this.standalone.returnResource(jedis);
        log.info("======================\u5168\u91cf\u5df2\u505a\u5b8c\uff0c\u5f00\u59cb\u505a\u589e\u91cf=======================");
        while (true) {
            try {
                while (true) {
                    this.sendFlink(ctx, serverName, streamkey, this.beginTime);
                    TimeAssist.reDoWaitInit((String)"need-init");
                }
            }
            catch (Throwable e) {
                boolean reDoWait = TimeAssist.reDoWait((String)"need-init", (int)8);
                log.error("readdo error", e);
                if (!reDoWait) continue;
                log.error("\u5df2\u91cd\u8bd58\u6b21\uff0c\u9000\u51fa\u7cfb\u7edf");
                LoggerUtil.exit((JvmStatus)JvmStatus.s15);
                continue;
            }
            break;
        }
    }

    private void sendFlink(SourceFunction.SourceContext<RowData> ctx, String serverName, String streamkey, long minTime) throws InvalidProtocolBufferException {
        Pair msg = this.standalone.getStreamBlockOneDuckulaEvent(streamkey, serverName);
        if (msg == null || (Long)msg.getLeft() < minTime) {
            return;
        }
        ListenerConf.DuckulaEvent duckulaEvent = ListenerConf.DuckulaEvent.parseFrom((byte[])((byte[])msg.getRight()));
        for (ListenerConf.DuckulaEventItem item : duckulaEvent.getItemsList()) {
            HashMap<String, String> addValueMap;
            GenericRowData rowDataafter = new GenericRowData(this.rowTypeFields.size());
            switch (duckulaEvent.getOptType()) {
                case insert: {
                    rowDataafter.setRowKind(RowKind.INSERT);
                    break;
                }
                case update: {
                    if (this.append) {
                        return;
                    }
                    GenericRowData rowDataBefore = new GenericRowData(this.rowTypeFields.size());
                    addValueMap = CollectionUtils.isEmpty(this.addColNames) ? new HashMap() : DuckulaAssit.getAddColValuesStr((ListenerConf.DuckulaEvent)duckulaEvent, (AddColNameType)AddColNameType.all_ori, (FieldFormart)this.fieldFormart);
                    this.packRow(addValueMap, item.getBeforeMap(), rowDataBefore);
                    rowDataBefore.setRowKind(RowKind.UPDATE_BEFORE);
                    ctx.collect((Object)rowDataBefore);
                    rowDataafter.setRowKind(RowKind.UPDATE_AFTER);
                    break;
                }
                case delete: {
                    if (this.append) {
                        return;
                    }
                    rowDataafter.setRowKind(RowKind.DELETE);
                    break;
                }
                default: {
                    throw new ProjectExceptionRuntime((IExcept)ExceptAll.duckula_datanofit, "redis\u4e0d\u652f\u6301\u6b64\u7c7b\u578b" + duckulaEvent.getOptType().name());
                }
            }
            Map itemmap = item.getAfterMap();
            addValueMap = CollectionUtils.isEmpty(this.addColNames) ? new HashMap<String, String>() : DuckulaAssit.getAddColValuesStr((ListenerConf.DuckulaEvent)duckulaEvent, (AddColNameType)AddColNameType.all_ori, (FieldFormart)this.fieldFormart);
            this.packRow(addValueMap, itemmap, rowDataafter);
            ctx.collect((Object)rowDataafter);
        }
    }

    private void packRow(Map<String, String> addValueMap, Map<String, String> allValue, GenericRowData rowData) {
        for (int i = 0; i < this.rowTypeFields.size(); ++i) {
            RowType.RowField rowField = this.rowTypeFields.get(i);
            FlinkTypeEnum flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField.getType().getTypeRoot().toString());
            Object value = null;
            if (MapUtils.isNotEmpty(addValueMap) && addValueMap.containsKey(rowField.getName())) {
                value = FlinkTypeEnum.getValue((FlinkTypeEnum)flinkTypeEnum, (String)addValueMap.get(rowField.getName()), (LogicalType)rowField.getType());
            } else {
                Object object = value = allValue.containsKey(rowField.getName()) ? FlinkTypeEnum.getValue((FlinkTypeEnum)flinkTypeEnum, (String)allValue.get(rowField.getName()), (LogicalType)rowField.getType()) : null;
            }
            if (value == null) continue;
            rowData.setField(i, value);
        }
    }

    public static int[] createValueFormatProjection(DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((physicalType.getTypeRoot() == LogicalTypeRoot.ROW ? 1 : 0) != 0, (Object)"Row data type expected.");
        int physicalFieldCount = LogicalTypeChecks.getFieldCount((LogicalType)physicalType);
        IntStream physicalFields = IntStream.range(0, physicalFieldCount);
        return physicalFields.toArray();
    }

    public void cancel() {
        if (this.standalone != null || this.standalone.isInit()) {
            this.standalone.destroy();
        }
    }

    public TypeInformation<RowData> getProducedType() {
        return null;
    }

    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RedisSourceOptions.packageOptions((ReadableConfig)this.optionsWith);
        String serverName = (String)this.optionsWith.get(RedisSourceOptions.groupid);
        this.standalone = RedisAssit.standalone((String)serverName);
    }
}

