package org.apache.rocketmq.streams.connectors.reader;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.ThreadUtil;
import org.apache.rocketmq.streams.connectors.IBoundedSourceReader;
import org.apache.rocketmq.streams.connectors.model.PullMessage;
import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/connectors/reader/DBScanReader.class */
public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable {
    private static final long serialVersionUID = 8172403250050893288L;
    private static final Log logger = LogFactory.getLog(DBScanReader.class);
    static final String sqlTemplate = "select * from %s where id >= %d and id < %d";
    ISource iSource;
    String url;
    String userName;
    String password;
    String tableName;
    int batchSize;
    long offset;
    long offsetStart;
    long offsetEnd;
    long maxOffset;
    long minOffset;
    ISplit iSplit;
    transient List<PullMessage> pullMessages;
    volatile transient boolean isFinishedCall = false;
    volatile boolean interrupt = false;
    volatile boolean isClosed = false;
    transient ThreadLocal<JDBCDriver> threadLocal = new ThreadLocal<JDBCDriver>() { // from class: org.apache.rocketmq.streams.connectors.reader.DBScanReader.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public JDBCDriver initialValue() {
            DBScanReader.logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
            return DriverBuilder.createDriver("com.mysql.jdbc.Driver", DBScanReader.this.url, DBScanReader.this.userName, DBScanReader.this.password);
        }
    };

    public String getUrl() {
        return this.url;
    }

    public void setUrl(String str) {
        this.url = str;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String str) {
        this.userName = str;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public String getTableName() {
        return this.tableName;
    }

    public void setTableName(String str) {
        this.tableName = str;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public ISplit getISplit() {
        return this.iSplit;
    }

    public void setISplit(ISplit iSplit) {
        this.iSplit = iSplit;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public void open(ISplit iSplit) {
        this.iSplit = iSplit;
        Map queryOneRow = this.threadLocal.get().queryOneRow("select min(id) as min_id, max(id) as max_id from " + this.tableName);
        this.minOffset = Long.parseLong(String.valueOf(queryOneRow.get("min_id")));
        this.maxOffset = Long.parseLong(String.valueOf(queryOneRow.get("max_id")));
        this.offsetStart = this.minOffset;
        this.offset = this.minOffset;
        logger.info(String.format("table %s min id [ %d ],  max id [ %d ]", this.tableName, Long.valueOf(this.minOffset), Long.valueOf(this.maxOffset)));
        this.pullMessages = new ArrayList();
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public boolean next() {
        if (this.interrupt) {
            return false;
        }
        if (isFinished()) {
            finish();
            ThreadUtil.sleep(10000L);
            return false;
        }
        JDBCDriver jDBCDriver = this.threadLocal.get();
        this.offsetEnd = this.offsetStart + this.batchSize;
        String format = String.format(sqlTemplate, this.tableName, Long.valueOf(this.offsetStart), Long.valueOf(this.offsetEnd));
        logger.debug(String.format("execute sql : %s", format));
        List<Map> queryForList = jDBCDriver.queryForList(format);
        this.offsetStart = this.offsetEnd;
        this.pullMessages.clear();
        for (Map map : queryForList) {
            PullMessage pullMessage = new PullMessage();
            JSONObject parseObject = JSONObject.parseObject(JSON.toJSONString(map));
            pullMessage.setMessage(parseObject);
            this.offset = this.offset > Long.parseLong(parseObject.getString("id")) ? this.offset : Long.parseLong(parseObject.getString("id"));
            pullMessage.setMessageOffset(new MessageOffset(String.valueOf(this.offset), true));
            this.pullMessages.add(pullMessage);
        }
        return this.offsetStart - ((long) this.batchSize) <= this.maxOffset;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public List<PullMessage> getMessage() {
        return this.pullMessages;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public SplitCloseFuture close() {
        this.isClosed = true;
        this.threadLocal.remove();
        this.pullMessages = null;
        return new SplitCloseFuture(this, this.iSplit);
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public void seek(String str) {
        if (str == null || str.trim().equals("")) {
            str = "0";
        }
        this.offset = Long.parseLong(str);
        if (this.offset < this.minOffset) {
            this.offset = this.minOffset;
        }
        this.offsetStart = this.offset;
        logger.info(String.format("split %s seek %d.", this.iSplit.getQueueId(), Long.valueOf(this.offset)));
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public String getProgress() {
        return String.valueOf(this.offset);
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public long getDelay() {
        return this.maxOffset - this.offset;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public long getFetchedDelay() {
        return 0L;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public boolean isClose() {
        return this.isClosed;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public ISplit getSplit() {
        return this.iSplit;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public boolean isInterrupt() {
        return this.interrupt;
    }

    @Override // org.apache.rocketmq.streams.connectors.reader.ISplitReader
    public boolean interrupt() {
        this.interrupt = true;
        return true;
    }

    public boolean isFinished() {
        return this.offsetStart > this.maxOffset;
    }

    public void finish() {
        if (this.isFinishedCall) {
            return;
        }
        this.pullMessages = null;
        updateReaderStatus();
        this.iSource.boundedFinishedCallBack(this.iSplit);
        this.isFinishedCall = true;
    }

    public ISource getISource() {
        return this.iSource;
    }

    public void setISource(ISource iSource) {
        this.iSource = iSource;
    }

    private final void updateReaderStatus() {
        Integer num = 1;
        ReaderStatus create = ReaderStatus.create(CycleDynamicMultipleDBScanSource.createKey(getISource()), this.iSplit.getQueueId(), num.intValue(), ((CycleDynamicMultipleDBScanSource) this.iSource).getTotalReader());
        logger.info(String.format("create reader status %s.", create));
        ORMUtil.batchReplaceInto(new Object[]{create});
    }
}
