/*
 * Decompiled with CFR 0.152.
 */
package cn.gongler.util.resend.db;

import cn.gongler.util.GonglerUtil;
import cn.gongler.util.QueueConsumer;
import cn.gongler.util.db.ConnectionFactory;
import cn.gongler.util.db.DbUtil;
import cn.gongler.util.db.IDbTask;
import cn.gongler.util.function.ExceptionBiFunction;
import cn.gongler.util.function.ExceptionConsumer;
import cn.gongler.util.resend.ISender;
import cn.gongler.util.resend.db.IAckPack;
import cn.gongler.util.resend.db.ISendPack;
import cn.gongler.util.resend.db.SendPackScanner;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.RowId;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.function.BiPredicate;
import java.util.function.Consumer;

public class RowIdDbSendPackScanner<Pack extends ISendPack, AckPack extends IAckPack> {
    private static final long serialVersionUID = 1L;
    protected String resetLoadTimeSql = "UPDATE EXT_PACK_SEND_QUEUE SET LOAD_TIME=NULL WHERE LOAD_TIME IS NOT NULL AND AND ((EXT_WAIT_ACK=1 AND ACK_CODE IS NULL) OR (EXT_WAIT_ACK=0 AND SENT_TIME IS NULL)) AND SYSDATE<SEND_EXPIRED_TIME";
    protected String loadSql = "SELECT t.ROWID, t.* FROM EXT_PACK_SEND_QUEUE t WHERE LOAD_TIME IS NULL ORDER BY ISSUE_TIME";
    protected String loadedSql = "UPDATE EXT_PACK_SEND_QUEUE SET LOAD_TIME=SYSTIMESTAMP WHERE ROWID=?";
    protected String sendSql = "UPDATE EXT_PACK_SEND_QUEUE SET SENT_TIME=SYSTIMESTAMP, TRY_COUNT=TRY_COUNT+1 WHERE ROWID=?";
    protected String ackSql = "UPDATE EXT_PACK_SEND_QUEUE SET ACK_TIME=SYSTIMESTAMP, ACK_CODE=? WHERE ROWID=?";
    protected String expiredSql = "UPDATE EXT_PACK_SEND_QUEUE SET ACK_TIME=SYSTIMESTAMP, ACK_CODE=-2 WHERE ROWID=?";
    final Runnable initTask;
    final ExceptionConsumer<SendPackScanner.ISendPackRegister<Pack, RowId>> checker;
    final ISender<Pack> realSender;
    BiPredicate<Pack, AckPack> ackChecker = (pack, ack) -> ack.isAckOf((ISendPack)pack);
    SendPackScanner<Pack, RowId, AckPack> poolingBusResender;
    private final QueueConsumer queueAutoConsumer;
    Consumer<IDbTask> dbTaskQueueAutoConsumerView;
    ConnectionFactory connectionFactory;
    private static final String DEFAULT_SEND_TABLE = "EXT_PACK_SEND_QUEUE";
    private String sendTableName = "EXT_PACK_SEND_QUEUE";

    public RowIdDbSendPackScanner(ConnectionFactory connectionFactory, ExceptionBiFunction<String, ResultSet, Pack> packFactory, ISender<Pack> realSender) {
        Objects.requireNonNull(connectionFactory);
        Objects.requireNonNull(packFactory);
        Objects.requireNonNull(realSender);
        this.connectionFactory = connectionFactory;
        this.queueAutoConsumer = QueueConsumer.of("DbSender.queueAutoConsumer");
        this.dbTaskQueueAutoConsumerView = this.queueAutoConsumer.toView(connectionFactory::acceptWithCatchAny);
        this.realSender = realSender;
        this.initTask = () -> {
            System.out.println("sendTable reset status//" + this.resetLoadTimeSql);
            connectionFactory.acceptWithCatchAny(conn -> {
                DbUtil.ExecuteUpdate(conn, this.resetLoadTimeSql, new Object[0]);
                GonglerUtil.Commit(conn);
            });
        };
        this.checker = lis -> {
            try (Connection conn = connectionFactory.getConnection();
                 PreparedStatement statement = conn.prepareStatement(this.loadSql);
                 PreparedStatement updateLoadTimePstm = conn.prepareStatement(this.loadedSql);){
                ResultSet rs = statement.executeQuery();
                while (rs.next()) {
                    DbUtil.ExecuteUpdate(conn, updateLoadTimePstm, rs.getRowId("ROWID"));
                    GonglerUtil.Commit(conn);
                    long busId = rs.getLong("TARGET_NO");
                    boolean waitAck = rs.getInt("EXT_WAIT_ACK") != 0;
                    LocalDateTime expiredTime = rs.getTimestamp("SEND_EXPIRED_TIME").toLocalDateTime();
                    RowId rowId = rs.getRowId("ROWID");
                    String packBody = rs.getString("PACK_BODY");
                    ISendPack sendPack = (ISendPack)packFactory.applyWithCatchAny(packBody, rs);
                    if (sendPack == null) continue;
                    lis.add(busId, sendPack, expiredTime, waitAck, rowId);
                }
            }
        };
    }

    public RowIdDbSendPackScanner<Pack, AckPack> sendTable(String sendTable) {
        Objects.requireNonNull(sendTable);
        String oldSendTable = this.sendTableName;
        this.sendTableName = sendTable;
        this.resetLoadTimeSql = this.resetLoadTimeSql.replace(oldSendTable, sendTable);
        this.loadSql = this.loadSql.replace(oldSendTable, sendTable);
        this.loadedSql = this.loadedSql.replace(oldSendTable, sendTable);
        this.sendSql = this.sendSql.replace(oldSendTable, sendTable);
        this.ackSql = this.ackSql.replace(oldSendTable, sendTable);
        this.expiredSql = this.expiredSql.replace(oldSendTable, sendTable);
        return this;
    }

    public void start() {
        this.poolingBusResender = new SendPackScanner(this.initTask, this.checker, this.ackChecker, this.realSender);
        this.poolingBusResender.busResender().addSendEventListener((params, sendTimes) -> this.pushDbTask(conn -> DbUtil.ExecuteUpdate(conn, this.sendSql, params.param()))).addSendAckEventListener((params, ack, sendContext) -> this.pushDbTask(conn -> DbUtil.ExecuteUpdate(conn, this.ackSql, ack.ackCode(), params.param()))).addSendExpiredEventListener((params, sendTimes, sendContext) -> this.pushDbTask(conn -> DbUtil.ExecuteUpdate(conn, this.expiredSql, params.param())));
    }

    public void acceptAck(AckPack busPackAck) {
        this.poolingBusResender.ack(busPackAck.busId(), busPackAck);
    }

    public void notifyRecheck() {
        this.poolingBusResender.notifyRecheck();
    }

    private void pushDbTask(IDbTask dataHandler) {
        this.dbTaskQueueAutoConsumerView.accept(dataHandler);
    }
}

