package com.github.niupengyu.schedule2.eps.reader.impl;

import com.alibaba.fastjson2.JSONObject;
import com.github.niupengyu.core.util.DateUtil;
import com.github.niupengyu.core.util.ObjectUtil;
import com.github.niupengyu.core.util.StringUtil;
import com.github.niupengyu.jdbc.bean.ColumnBean;
import com.github.niupengyu.jdbc.bean.DataBaseBean;
import com.github.niupengyu.jdbc.dao.JdbcDaoFace;
import com.github.niupengyu.jdbc.dao.callback.ReadCallBack;
import com.github.niupengyu.jdbc.data.DataFactory;
import com.github.niupengyu.jdbc.util.ColumnBeanUtil;
import com.github.niupengyu.schedule2.beans.task.EpsWriterManager;
import com.github.niupengyu.schedule2.beans.task.JobEnvironment;
import com.github.niupengyu.schedule2.beans.task.TableInfo;
import com.github.niupengyu.schedule2.beans.task.TableSqlInfo;
import com.github.niupengyu.schedule2.eps.reader.EPSReader;
import com.github.niupengyu.schedule2.time.TimeProcessor;
import com.github.niupengyu.schedule2.tools.DataCounter;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.chrono.ChronoLocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:com/github/niupengyu/schedule2/eps/reader/impl/EPSTimeReaderImpl.class */
public class EPSTimeReaderImpl implements EPSReader {
    private TableInfo tableInfo;
    private JdbcDaoFace originJdbcDao;
    private JobEnvironment jobEnvironment;
    private TimeProcessor timeProcessor;
    private DataBaseBean dataBaseBean;
    private JSONObject componentParams;
    private Object finalEndTime;
    private Object finalStartTime;

    public EPSTimeReaderImpl(TableInfo tableInfo, DataBaseBean dataBaseBean, JdbcDaoFace jdbcDaoFace, JobEnvironment jobEnvironment, JSONObject jSONObject) {
        this.tableInfo = tableInfo;
        this.originJdbcDao = jdbcDaoFace;
        this.jobEnvironment = jobEnvironment;
        this.dataBaseBean = dataBaseBean;
        this.componentParams = jSONObject;
    }

    @Override // com.github.niupengyu.schedule2.eps.reader.EPSReader
    public void readData(EpsWriterManager epsWriterManager) throws Exception {
        String generateCompareCount;
        String generateCompareSelect;
        boolean compareFormat;
        TableSqlInfo tableSqlInfo = this.tableInfo.getTableSqlInfo();
        if (this.jobEnvironment.isTimer()) {
            generateCompareCount = tableSqlInfo.generateCountSql(this.tableInfo);
            generateCompareSelect = tableSqlInfo.generateSelectSql(this.tableInfo);
            compareFormat = this.tableInfo.getTimestampFormat();
        } else {
            generateCompareCount = tableSqlInfo.generateCompareCount(this.tableInfo);
            generateCompareSelect = tableSqlInfo.generateCompareSelect(this.tableInfo);
            compareFormat = this.tableInfo.getCompareFormat();
        }
        this.jobEnvironment.addLog("countSql", generateCompareCount);
        this.jobEnvironment.addLog("dataSql", generateCompareSelect);
        String startTime = this.tableInfo.getStartTime();
        Long endTime = this.tableInfo.getEndTime();
        this.timeProcessor = createTimeFormat();
        this.jobEnvironment.addLog("初始时间", startTime, endTime);
        this.jobEnvironment.addLog("EPSTimeReader ", this.jobEnvironment.taskRunningType());
        this.finalStartTime = initStartTime(this.tableInfo, this.timeProcessor);
        this.finalEndTime = initEndTime(this.tableInfo, this.timeProcessor);
        this.jobEnvironment.addLog("数据范围", this.finalStartTime, this.finalEndTime);
        Timestamp timestamp = this.timeProcessor.toTimestamp(this.finalStartTime);
        Timestamp timestamp2 = this.timeProcessor.toTimestamp(this.finalEndTime);
        this.jobEnvironment.addLog("数据范围", timestamp, timestamp2);
        this.jobEnvironment.addLog("数据转换", Boolean.valueOf(compareFormat));
        if (compareFormat) {
            this.finalStartTime = this.timeProcessor.dateToString(this.finalStartTime);
            this.finalEndTime = this.timeProcessor.dateToString(this.finalEndTime);
        }
        double intValue = this.originJdbcDao.getIntValue(generateCompareCount, new Object[]{this.finalStartTime, this.finalEndTime}).intValue();
        this.jobEnvironment.addLog("当前数据量 ", Double.valueOf(intValue));
        if (intValue < 1.0d) {
            this.jobEnvironment.addLog("没有新数据", new Object[0]);
            return;
        }
        DataCounter dataCounter = new DataCounter(intValue);
        String originDatabase = this.tableInfo.getOriginDatabase();
        String originTable = this.tableInfo.getOriginTable();
        List<ColumnBean> columns = this.tableInfo.getColumns();
        ColumnBeanUtil.initJavaType(DataFactory.getInstance(), columns);
        LocalDateTime localDateTime = timestamp.toLocalDateTime();
        LocalDateTime localDateTime2 = timestamp2.toLocalDateTime();
        int intValue2 = this.componentParams.getIntValue("timeSplit");
        if (intValue2 == 0) {
            readData(this.finalStartTime, this.finalEndTime, generateCompareSelect, columns, epsWriterManager, originDatabase, originTable, dataCounter);
        } else {
            LocalDateTime localDateTime3 = localDateTime;
            boolean z = true;
            while (z) {
                LocalDateTime plusMinutes = localDateTime3.plusMinutes(intValue2);
                if (localDateTime2.compareTo((ChronoLocalDateTime<?>) plusMinutes) < 0) {
                    plusMinutes = localDateTime2;
                    z = false;
                }
                Object value = this.timeProcessor.toValue(localDateTime3);
                Object value2 = this.timeProcessor.toValue(plusMinutes);
                this.jobEnvironment.addLog("当前数据范围", this.timeProcessor.toTimestamp(value), this.timeProcessor.toTimestamp(value2), DateUtil.dateFormat(localDateTime2));
                readData(value, value2, generateCompareSelect, columns, epsWriterManager, originDatabase, originTable, dataCounter);
                localDateTime3 = plusMinutes;
            }
        }
        epsWriterManager.success();
    }

    private void readData(Object obj, Object obj2, String str, List<ColumnBean> list, final EpsWriterManager epsWriterManager, final String str2, final String str3, final DataCounter dataCounter) throws Exception {
        this.originJdbcDao.fetchRead(str, 20000, list, new ReadCallBack<Map<String, Object>>() { // from class: com.github.niupengyu.schedule2.eps.reader.impl.EPSTimeReaderImpl.1
            public void readData(int i, int i2, List<Map<String, Object>> list2) throws Exception {
                EPSTimeReaderImpl.this.jobEnvironment.interrupt();
                EPSTimeReaderImpl.this.jobEnvironment.addLog("当前批次", Integer.valueOf(list2.size()));
                Iterator<Map<String, Object>> it = list2.iterator();
                while (it.hasNext()) {
                    epsWriterManager.addData(it.next());
                    dataCounter.add(1);
                }
                JobEnvironment jobEnvironment = EPSTimeReaderImpl.this.jobEnvironment;
                double count = dataCounter.getCount();
                double total = dataCounter.getTotal();
                dataCounter.percent();
                jobEnvironment.addLog("当前进度", str2, str3, count + "/" + jobEnvironment + " --> " + total);
            }
        }, new Object[]{obj, obj2});
    }

    public TimeProcessor createTimeFormat() {
        return TimeProcessor.createTimeProcessor("THREAD".equals(this.jobEnvironment.taskRunningType()) ? this.tableInfo.getCompareType() : this.tableInfo.getSjcType(), "");
    }

    private Object initStartTime(TableInfo tableInfo, TimeProcessor timeProcessor) throws Exception {
        Object obj = null;
        if (!"TIMER".equals(this.jobEnvironment.taskRunningType())) {
            String param = this.jobEnvironment.getParam("startTime");
            if (StringUtil.notNull(param)) {
                obj = timeProcessor.toValue(param);
            }
        } else if (tableInfo != null) {
            Long endTime = tableInfo.getEndTime();
            if (StringUtil.notNull(endTime)) {
                long j = ObjectUtil.toLong(this.componentParams.getLong("minutes"), 0L);
                this.jobEnvironment.addLog("上次结束时间", new Timestamp(endTime.longValue()), Long.valueOf(j));
                obj = timeProcessor.toValue(tableInfo.getEndTime().longValue(), j);
            }
        }
        if (obj == null) {
            obj = this.originJdbcDao.getObject(tableInfo.getTableSqlInfo().getMinSql(tableInfo));
        }
        return obj;
    }

    private Object initEndTime(TableInfo tableInfo, TimeProcessor timeProcessor) throws Exception {
        String param = this.jobEnvironment.getParam("endTime");
        return this.jobEnvironment.isTimer() ? this.originJdbcDao.getObject(tableInfo.getTableSqlInfo().generateMaxSql(tableInfo)) : StringUtil.isNull(param) ? this.originJdbcDao.getObject(tableInfo.getTableSqlInfo().generateMaxSql(tableInfo)) : timeProcessor.toValue(param);
    }

    @Override // com.github.niupengyu.schedule2.eps.reader.EPSReader
    public void end() throws SQLException {
        this.jobEnvironment.addLog("update eps ", Boolean.valueOf(this.jobEnvironment.isTimer()));
        if (this.jobEnvironment.isTimer()) {
            Timestamp timestamp = this.timeProcessor.toTimestamp(this.finalEndTime);
            this.jobEnvironment.addLog("下次开始时间 ", this.tableInfo.getOriginDatabase(), this.tableInfo.getOriginTable(), timestamp);
            this.tableInfo.setLastTime(this.tableInfo.getEndTime());
            this.tableInfo.setEndTime(Long.valueOf(this.timeProcessor.toLong(this.finalEndTime)));
            this.tableInfo.setEndTimeFormat(DateUtil.dateFormat(timestamp));
            this.jobEnvironment.updateEPSTime(this.tableInfo);
        }
    }
}
