/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.binlog.connector;

import com.twitter.chill.protobuf.ProtobufSerializer;
import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.binlog.alone.BusiAssit;
import net.wicp.tams.common.binlog.alone.DuckulaAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.binlog.bean.Rule;
import net.wicp.tams.common.binlog.alone.binlog.bean.RuleItem;
import net.wicp.tams.common.binlog.alone.binlog.bean.RuleManager;
import net.wicp.tams.common.binlog.alone.parser.ParseLogOnline;
import net.wicp.tams.common.binlog.dump.MainDump;
import net.wicp.tams.common.flink.common.schema.DuckulaDeserializationSchema;
import net.wicp.tams.common.flink.connector.binlog.DuckulaOptions;
import net.wicp.tams.common.flink.connector.binlog.connector.FlinkBinlogTableListener;
import net.wicp.tams.common.jdbc.DruidAssit;
import net.wicp.tams.common.jdbc.MySqlAssitExt;
import net.wicp.tams.common.thread.threadlocal.PerthreadManager;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogSourceFunction
extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData>,
CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(BinlogSourceFunction.class);
    private static final long serialVersionUID = 1L;
    private final boolean cdc;
    private static final String listener = "net.wicp.tams.common.flink.connector.binlog.connector.FlinkBinlogTableListener";
    private static final String chk = "net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory";
    private final DuckulaDeserializationSchema duckulaDeserializationSchema;
    private ListenerConf.ConnConf connConf;
    private ParseLogOnline logFetcher;
    private Configuration optionsWith;
    private transient ListState<ListenerConf.CheckPoint> checkpointedState;
    private List<ListenerConf.ColHis> colsList = null;

    public BinlogSourceFunction(String configKey, DuckulaDeserializationSchema duckulaDeserializationSchema, boolean cdc, Configuration optionsWith) {
        this.duckulaDeserializationSchema = duckulaDeserializationSchema;
        ListenerConf.ConnConf.Builder connConfBuilder = BusiAssit.configMap((String)configKey);
        connConfBuilder.setListener(listener);
        connConfBuilder.setChk(chk);
        log.info("====\u8bbe\u7f6echk:{}", (Object)chk);
        this.connConf = connConfBuilder.build();
        this.cdc = cdc;
        this.optionsWith = optionsWith;
    }

    public BinlogSourceFunction(DuckulaDeserializationSchema duckulaDeserializationSchema, boolean cdc, Configuration optionsWith) {
        this(optionsWith.getString(DuckulaOptions.name), duckulaDeserializationSchema, cdc, optionsWith);
    }

    public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
        DuckulaOptions.packageParams((ReadableConfig)this.optionsWith);
        if (this.cdc) {
            log.info("config========================" + this.connConf);
            Connection conn = DruidAssit.getConnection((String)this.connConf.getConfName());
            ListenerConf.Position position = DuckulaAssit.getMastStatus((Connection)conn).build();
            ListenerConf.ConnConf.Builder builder = this.connConf.toBuilder();
            builder.setPos(position);
            this.connConf = builder.build();
            this.packgeDumpWhereSql(position);
            PerthreadManager.getInstance().createValue((Object)"_source_ctx", SourceFunction.SourceContext.class).set(ctx);
            PerthreadManager.getInstance().createValue((Object)"_source_deserialization", DuckulaDeserializationSchema.class).set((Object)this.duckulaDeserializationSchema);
            MainDump main = new MainDump(this.connConf.getConfName());
            main.dump(null);
            PerthreadManager.getInstance().cleanValue((Object)"_source_ctx");
            PerthreadManager.getInstance().cleanValue((Object)"_source_deserialization");
            while (!Conf.getBoolean((String)"common.binlog.alone.dump.global.isOver").booleanValue()) {
                log.info("========The dump is not complete =======");
                Thread.sleep(6000L);
            }
        }
        this.logFetcher = new ParseLogOnline(this.connConf.toBuilder());
        FlinkBinlogTableListener binlogListener = (FlinkBinlogTableListener)this.logFetcher.getBuffType().getBinlogListenerProxy().getIBinlogListener(this.connConf.getHost());
        binlogListener.setCtx(ctx);
        binlogListener.setDuckulaDeserializationSchema(this.duckulaDeserializationSchema);
        this.logFetcher.setColHis(this.colsList);
        this.logFetcher.read();
    }

    private void packgeDumpWhereSql(ListenerConf.Position position) {
        Map dumpConfs = Conf.getPreGroup((String)String.format("common.binlog.alone.dump.%s.ori", this.connConf.getConfName()), (String[])new String[0]);
        String updateColName = (String)this.optionsWith.get(DuckulaOptions.updateColName);
        Properties props = new Properties();
        for (String dumpId : dumpConfs.keySet()) {
            Map dumpconf = (Map)dumpConfs.get(dumpId);
            if (StringUtil.isNull(dumpconf.get("rule"))) continue;
            RuleManager ruleManager = new RuleManager((String)dumpconf.get("rule"));
            List rules = ruleManager.getRules();
            for (Rule rule : rules) {
                if (!rule.containsItem(RuleItem.wheresql)) {
                    rule.putRuleItem(RuleItem.wheresql, String.format("where %s<=|%s|", updateColName, position.getTimeStr()));
                    continue;
                }
                String sqlstr = rule.getRuleItem(RuleItem.wheresql).toLowerCase();
                boolean index = sqlstr.contains(updateColName);
                sqlstr = !index ? sqlstr + String.format(" and %s<=|%s|", updateColName, position.getTimeStr()) : MySqlAssitExt.minSelectSql((String)sqlstr.replace("|", "'"), (String)String.format(" and %s<='%s'", updateColName, position.getTimeStr()));
                rule.putRuleItem(RuleItem.wheresql, sqlstr.replace("'", "|"));
            }
            String ruleStrTrue = ruleManager.toString();
            props.put(String.format("common.binlog.alone.dump.%s.ori.%s.rule", this.connConf.getConfName(), dumpId), ruleStrTrue);
        }
        Conf.overProp((Properties)props);
    }

    public void cancel() {
        if (this.logFetcher != null) {
            log.info("============cancel the logFetcher");
            this.logFetcher.close();
        }
    }

    public TypeInformation<RowData> getProducedType() {
        return this.duckulaDeserializationSchema.getProducedType();
    }

    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);
        t.getExecutionConfig().registerTypeWithKryoSerializer(ListenerConf.CheckPoint.class, ProtobufSerializer.class);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (parameters != null) {
            this.getRuntimeContext().getUserCodeClassLoader().loadClass(listener);
            this.getRuntimeContext().getUserCodeClassLoader().loadClass(chk);
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (this.logFetcher != null) {
            this.checkpointedState.clear();
            ListenerConf.CheckPoint checkPoint = this.logFetcher.getCheckPointCur();
            this.checkpointedState.add((Object)checkPoint);
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        Iterator iterator;
        ListStateDescriptor descriptor = new ListStateDescriptor("duckula-checkPoint", TypeInformation.of((TypeHint)new TypeHint<ListenerConf.CheckPoint>(){}));
        this.checkpointedState = context.getOperatorStateStore().getListState(descriptor);
        if (context.isRestored() && (iterator = ((Iterable)this.checkpointedState.get()).iterator()).hasNext()) {
            ListenerConf.ConnConf.Builder newBuilder = this.connConf.toBuilder();
            ListenerConf.CheckPoint checkPoint = (ListenerConf.CheckPoint)iterator.next();
            ListenerConf.Position pos = checkPoint.getPos();
            log.info("the binlog begin from:{}", (Object)pos.getGtids());
            newBuilder.setPos(pos);
            this.connConf = newBuilder.build();
            this.colsList = checkPoint.getColsList();
        }
    }
}

