/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.binlog.cdc;

import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.binlog.alone.dump.bean.Dump;
import net.wicp.tams.common.binlog.alone.dump.bean.DumpEvent;
import net.wicp.tams.common.binlog.alone.dump.listener.IBusiSender;
import net.wicp.tams.common.constant.FieldFormart;
import net.wicp.tams.common.constant.ods.AddColName;
import net.wicp.tams.common.flink.common.constant.FlinkTypeEnum;
import net.wicp.tams.common.flink.common.schema.DuckulaDeserializationSchema;
import net.wicp.tams.common.thread.threadlocal.PerthreadManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class FlinkDumpListener
implements IBusiSender<DumpEvent> {
    private SourceFunction.SourceContext<RowData> ctx;
    private List<RowType.RowField> rowTypeFields;

    public void init(Dump dump) {
        this.ctx = (SourceFunction.SourceContext)PerthreadManager.getInstance().createValue((Object)"_source_ctx", SourceFunction.SourceContext.class).get();
        DuckulaDeserializationSchema duckulaDeserializationSchema = (DuckulaDeserializationSchema)PerthreadManager.getInstance().createValue((Object)"_source_deserialization", DuckulaDeserializationSchema.class).get();
        this.rowTypeFields = duckulaDeserializationSchema.getRowTypeFields();
    }

    public void doSend(DumpEvent dumpEvent, Map<AddColName, Serializable> addValues, String newDb, String newTb, boolean isSplit) {
        if (CollectionUtils.isEmpty((Collection)dumpEvent.getDatas())) {
            return;
        }
        for (Map data : dumpEvent.getDatas()) {
            GenericRowData rowData = new GenericRowData(this.rowTypeFields.size());
            rowData.setRowKind(RowKind.INSERT);
            JSONObject others = new JSONObject();
            ArrayList<String> existField = new ArrayList<String>();
            int duckula_other_index = -1;
            for (int j = 0; j < this.rowTypeFields.size(); ++j) {
                RowType.RowField rowField2 = this.rowTypeFields.get(j);
                if ("duckula_other".equals(rowField2.getName())) {
                    duckula_other_index = j;
                    continue;
                }
                if (ArrayUtils.contains((Object[])dumpEvent.getDump().getNeedCols(), (Object)rowField2.getName())) {
                    existField.add(rowField2.getName());
                    FlinkTypeEnum flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField2.getType().getTypeRoot().toString());
                    Object retojb = null;
                    String datastr = (String)data.get(rowField2.getName());
                    if (StringUtil.isNotNull((Object)datastr)) {
                        retojb = flinkTypeEnum == null || "java.lang.String".equals(flinkTypeEnum.getJavaType().getTypeName()) ? new BinaryStringData((String)data.get(rowField2.getName())) : FlinkTypeEnum.getValue((FlinkTypeEnum)flinkTypeEnum, (String)((String)data.get(rowField2.getName())), (LogicalType)rowField2.getType());
                        rowData.setField(j, retojb);
                        continue;
                    }
                    rowData.setField(j, null);
                    continue;
                }
                rowData.setField(j, null);
            }
            if (MapUtils.isNotEmpty(addValues)) {
                List rowFields = this.rowTypeFields.stream().map(rowField -> rowField.getName()).collect(Collectors.toList());
                FieldFormart fieldFormart = (FieldFormart)Conf.getEnum(FieldFormart.class, (String)"common.binlog.alone.global.fieldFormart");
                for (AddColName addColName : addValues.keySet()) {
                    GenericRowData row = rowData;
                    int indexOfRowType = rowFields.indexOf(addColName.getColNameTrue(fieldFormart));
                    if (indexOfRowType < 0) continue;
                    RowType.RowField rowField3 = this.rowTypeFields.get(indexOfRowType);
                    FlinkTypeEnum flinkTypeEnum = FlinkTypeEnum.findByFlinkRowType((String)rowField3.getType().getTypeRoot().toString());
                    Object retojb = null;
                    String valueStr = String.valueOf(addValues.get(addColName));
                    retojb = flinkTypeEnum == null || "java.lang.String".equals(flinkTypeEnum.getJavaType().getTypeName()) ? new BinaryStringData(valueStr) : FlinkTypeEnum.getValue((FlinkTypeEnum)flinkTypeEnum, (String)valueStr, (LogicalType)rowField3.getType());
                    row.setField(indexOfRowType, retojb);
                }
            }
            if (duckula_other_index >= 0) {
                for (String colName : dumpEvent.getDump().getNeedCols()) {
                    if (existField.contains(colName)) continue;
                    others.put(colName, data.get(colName));
                }
                rowData.setField(duckula_other_index, (Object)new BinaryStringData(others.toJSONString()));
            }
            this.ctx.collect((Object)rowData);
        }
    }
}

