package org.apache.inlong.audit.service;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.db.entities.JdbcDataPo;
import org.apache.inlong.audit.protocol.AuditData;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/JdbcService.class */
public class JdbcService implements InsertData, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcService.class);
    private static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id, \r\n      sdk_ts, packet_id, log_ts, \r\n      inlong_group_id, inlong_stream_id, audit_id, audit_tag, audit_version, \r\n      count, size, delay, \r\n      update_time)\r\n    values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    private final JdbcConfig jdbcConfig;
    private LinkedBlockingQueue<JdbcDataPo> receiveQueue;
    private Connection connection;
    private final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
    private long lastCheckTime = System.currentTimeMillis();
    private final List<JdbcDataPo> writeDataList = new LinkedList();

    public JdbcService(JdbcConfig jdbcConfig) {
        this.jdbcConfig = jdbcConfig;
    }

    public void start() {
        this.receiveQueue = new LinkedBlockingQueue<>(this.jdbcConfig.getDataQueueSize());
        try {
            Class.forName(this.jdbcConfig.getDriver());
            reconnect();
        } catch (Exception e) {
            LOG.error("Start failure!", e);
        }
        this.timerService.scheduleWithFixedDelay(this::process, this.jdbcConfig.getProcessIntervalMs(), this.jdbcConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
    }

    private void process() {
        if (this.receiveQueue.size() < this.jdbcConfig.getBatchThreshold() && System.currentTimeMillis() - this.lastCheckTime < this.jdbcConfig.getBatchIntervalMs()) {
            return;
        }
        this.lastCheckTime = System.currentTimeMillis();
        if (this.writeDataList.size() > 0) {
            if (!executeBatch(this.writeDataList)) {
                return;
            }
            acknowledge(this.writeDataList);
            this.writeDataList.clear();
        }
        JdbcDataPo poll = this.receiveQueue.poll();
        while (true) {
            JdbcDataPo jdbcDataPo = poll;
            if (jdbcDataPo == null) {
                return;
            }
            this.writeDataList.add(jdbcDataPo);
            if (this.writeDataList.size() > this.jdbcConfig.getBatchThreshold()) {
                if (!executeBatch(this.writeDataList)) {
                    return;
                }
                acknowledge(this.writeDataList);
                this.writeDataList.clear();
            }
            poll = this.receiveQueue.poll();
        }
    }

    private boolean executeBatch(List<JdbcDataPo> list) {
        boolean z = false;
        try {
            PreparedStatement prepareStatement = this.connection.prepareStatement(INSERT_SQL);
            Throwable th = null;
            try {
                try {
                    for (JdbcDataPo jdbcDataPo : list) {
                        prepareStatement.setString(1, jdbcDataPo.getIp());
                        prepareStatement.setString(2, jdbcDataPo.getDockerId());
                        prepareStatement.setString(3, jdbcDataPo.getThreadId());
                        prepareStatement.setTimestamp(4, jdbcDataPo.getSdkTs());
                        prepareStatement.setLong(5, jdbcDataPo.getPacketId().longValue());
                        prepareStatement.setTimestamp(6, jdbcDataPo.getLogTs());
                        prepareStatement.setString(7, jdbcDataPo.getInLongGroupId());
                        prepareStatement.setString(8, jdbcDataPo.getInLongStreamId());
                        prepareStatement.setString(9, jdbcDataPo.getAuditId());
                        prepareStatement.setString(10, jdbcDataPo.getAuditTag());
                        prepareStatement.setLong(11, jdbcDataPo.getAuditVersion());
                        prepareStatement.setLong(12, jdbcDataPo.getCount().longValue());
                        prepareStatement.setLong(13, jdbcDataPo.getSize().longValue());
                        prepareStatement.setLong(14, jdbcDataPo.getDelay().longValue());
                        prepareStatement.setTimestamp(15, jdbcDataPo.getUpdateTime());
                        prepareStatement.addBatch();
                    }
                    prepareStatement.executeBatch();
                    this.connection.commit();
                    z = true;
                    if (prepareStatement != null) {
                        if (0 != 0) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Execute batch has failure!", e);
            try {
                reconnect();
            } catch (SQLException e2) {
                LOG.error("Re-connect has  failure!", e2);
            }
        }
        return z;
    }

    private void reconnect() throws SQLException {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (Exception e) {
                LOG.error("Reconnect has exception!", e);
            }
            this.connection = null;
        }
        this.connection = DriverManager.getConnection(this.jdbcConfig.getUrl(), this.jdbcConfig.getUserName(), this.jdbcConfig.getPassword());
        this.connection.setAutoCommit(false);
    }

    @Override // org.apache.inlong.audit.service.InsertData
    public void insert(AuditData auditData) {
    }

    @Override // org.apache.inlong.audit.service.InsertData
    public void insert(AuditData auditData, Consumer<byte[]> consumer, MessageId messageId) {
        JdbcDataPo jdbcDataPo = new JdbcDataPo();
        jdbcDataPo.setConsumer(consumer);
        jdbcDataPo.setMessageId(messageId);
        jdbcDataPo.setIp(auditData.getIp());
        jdbcDataPo.setThreadId(auditData.getThreadId());
        jdbcDataPo.setDockerId(auditData.getDockerId());
        jdbcDataPo.setPacketId(Long.valueOf(auditData.getPacketId()));
        jdbcDataPo.setSdkTs(new Timestamp(auditData.getSdkTs()));
        jdbcDataPo.setLogTs(new Timestamp(auditData.getLogTs()));
        jdbcDataPo.setAuditId(auditData.getAuditId());
        jdbcDataPo.setAuditTag(auditData.getAuditTag());
        jdbcDataPo.setAuditVersion(auditData.getAuditVersion());
        jdbcDataPo.setCount(Long.valueOf(auditData.getCount()));
        jdbcDataPo.setDelay(Long.valueOf(auditData.getDelay()));
        jdbcDataPo.setInLongGroupId(auditData.getInlongGroupId());
        jdbcDataPo.setInLongStreamId(auditData.getInlongStreamId());
        jdbcDataPo.setSize(Long.valueOf(auditData.getSize()));
        jdbcDataPo.setUpdateTime(new Timestamp(System.currentTimeMillis()));
        try {
            this.receiveQueue.offer(jdbcDataPo, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("Insert data has InterruptedException ", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.connection.close();
        this.timerService.shutdown();
    }

    private void acknowledge(List<JdbcDataPo> list) {
        Iterator<JdbcDataPo> it = list.iterator();
        while (it.hasNext()) {
            JdbcDataPo next = it.next();
            try {
                if (next.getConsumer() != null && next.getMessageId() != null) {
                    next.getConsumer().acknowledge(next.getMessageId());
                }
                it.remove();
            } catch (Exception e) {
                LOG.error("Acknowledge has exception!", e);
            }
        }
    }
}
