/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.binlog.connector;

import com.twitter.chill.protobuf.ProtobufSerializer;
import java.util.Iterator;
import java.util.List;
import net.wicp.tams.common.binlog.alone.BusiAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.parser.ParseLogOnline;
import net.wicp.tams.common.flink.connector.binlog.connector.FlinkBinlogListener;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogSource
extends RichSourceFunction<ListenerConf.DuckulaEvent>
implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(BinlogSource.class);
    private static final long serialVersionUID = 1L;
    private ListenerConf.ConnConf connConf;
    private ParseLogOnline logFetcher;
    private static final String listener = "net.wicp.tams.common.flink.connector.binlog.connector.FlinkBinlogListener";
    private static final String chk = "net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory";
    private List<ListenerConf.ColHis> colsList = null;
    private transient ListState<ListenerConf.CheckPoint> checkpointedState;

    public BinlogSource(ListenerConf.ConnConf.Builder connConfBuilder, ExecutionConfig config) {
        connConfBuilder.setListener(listener);
        connConfBuilder.setChk(chk);
        log.info("====\u8bbe\u7f6echk:{}", (Object)chk);
        this.connConf = connConfBuilder.build();
        if (config != null) {
            config.registerTypeWithKryoSerializer(ListenerConf.DuckulaEvent.class, ProtobufSerializer.class);
        }
    }

    public BinlogSource(String configKey, ExecutionConfig config) {
        ListenerConf.ConnConf.Builder connConfBuilder = BusiAssit.configMap((String)configKey);
        connConfBuilder.setListener(listener);
        connConfBuilder.setChk(chk);
        log.info("====\u8bbe\u7f6echk:{}", (Object)chk);
        this.connConf = connConfBuilder.build();
        if (config != null) {
            config.registerTypeWithKryoSerializer(ListenerConf.DuckulaEvent.class, ProtobufSerializer.class);
        }
    }

    public BinlogSource(ExecutionConfig config) {
        this("_global", config);
    }

    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);
        t.getExecutionConfig().registerTypeWithKryoSerializer(ListenerConf.CheckPoint.class, ProtobufSerializer.class);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (parameters != null) {
            this.getRuntimeContext().getUserCodeClassLoader().loadClass(listener);
            this.getRuntimeContext().getUserCodeClassLoader().loadClass(chk);
        }
        this.logFetcher = new ParseLogOnline(this.connConf.toBuilder());
    }

    public void run(SourceFunction.SourceContext<ListenerConf.DuckulaEvent> ctx) throws Exception {
        FlinkBinlogListener binlogListener = (FlinkBinlogListener)this.logFetcher.getBuffType().getBinlogListenerProxy().getIBinlogListener(this.connConf.getHost());
        binlogListener.setCtx(ctx);
        this.logFetcher.setColHis(this.colsList);
        this.logFetcher.read();
    }

    public void cancel() {
        if (this.logFetcher != null) {
            log.info("============cancel the logFetcher");
            this.logFetcher.close();
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkpointedState.clear();
        ListenerConf.CheckPoint checkPoint = this.logFetcher.getCheckPointCur();
        this.checkpointedState.add((Object)checkPoint);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        Iterator iterator;
        ListStateDescriptor descriptor = new ListStateDescriptor("duckula-checkPoint", TypeInformation.of((TypeHint)new TypeHint<ListenerConf.CheckPoint>(){}));
        this.checkpointedState = context.getOperatorStateStore().getListState(descriptor);
        if (context.isRestored() && (iterator = ((Iterable)this.checkpointedState.get()).iterator()).hasNext()) {
            ListenerConf.ConnConf.Builder newBuilder = this.connConf.toBuilder();
            ListenerConf.CheckPoint checkPoint = (ListenerConf.CheckPoint)iterator.next();
            ListenerConf.Position pos = checkPoint.getPos();
            log.info("the binlog begin from:{}", (Object)pos.getGtids());
            newBuilder.setPos(pos);
            this.connConf = newBuilder.build();
            this.colsList = checkPoint.getColsList();
        }
    }
}

