package org.apache.inlong.audit.service;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
import org.apache.inlong.audit.protocol.AuditData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/ClickHouseService.class */
public class ClickHouseService implements InsertData, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class);
    public static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id,\r\n      sdk_ts, packet_id, log_ts,\r\n      inlong_group_id, inlong_stream_id, audit_id,audit_tag,\r\n      count, size, delay, \r\n      update_time)\r\n    values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    private ClickHouseConfig chConfig;
    private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
    private Connection conn;
    private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
    private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
    private AtomicInteger batchCounter = new AtomicInteger(0);
    private AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis());

    public ClickHouseService(ClickHouseConfig clickHouseConfig) {
        this.chConfig = clickHouseConfig;
    }

    public void start() {
        this.batchQueue = new LinkedBlockingQueue<>((this.chConfig.getBatchThreshold() * this.chConfig.getBatchIntervalMs()) / this.chConfig.getProcessIntervalMs());
        try {
            Class.forName(this.chConfig.getDriver());
            reconnect();
        } catch (Exception e) {
            LOG.error("ClickHouseService start failure!", e);
        }
        this.timerService.scheduleWithFixedDelay(this::processOutput, this.chConfig.getProcessIntervalMs(), this.chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Finally extract failed */
    private void processOutput() {
        if (this.needBatchOutput.get() || System.currentTimeMillis() - this.lastCheckTime.get() >= this.chConfig.getBatchIntervalMs()) {
            try {
                PreparedStatement prepareStatement = this.conn.prepareStatement(INSERT_SQL);
                Throwable th = null;
                try {
                    int i = 0;
                    ClickHouseDataPo poll = this.batchQueue.poll();
                    while (poll != null) {
                        prepareStatement.setString(1, poll.getIp());
                        prepareStatement.setString(2, poll.getDockerId());
                        prepareStatement.setString(3, poll.getThreadId());
                        prepareStatement.setTimestamp(4, poll.getSdkTs());
                        prepareStatement.setLong(5, poll.getPacketId().longValue());
                        prepareStatement.setTimestamp(6, poll.getLogTs());
                        prepareStatement.setString(7, poll.getInlongGroupId());
                        prepareStatement.setString(8, poll.getInlongStreamId());
                        prepareStatement.setString(9, poll.getAuditId());
                        prepareStatement.setString(10, poll.getAuditTag());
                        prepareStatement.setLong(11, poll.getCount().longValue());
                        prepareStatement.setLong(12, poll.getSize().longValue());
                        prepareStatement.setLong(13, poll.getDelay().longValue());
                        prepareStatement.setTimestamp(14, poll.getUpdateTime());
                        prepareStatement.addBatch();
                        this.batchCounter.decrementAndGet();
                        i++;
                        if (i >= this.chConfig.getBatchThreshold()) {
                            prepareStatement.executeBatch();
                            this.conn.commit();
                            i = 0;
                        }
                        poll = this.batchQueue.poll();
                    }
                    if (i > 0) {
                        prepareStatement.executeBatch();
                        this.conn.commit();
                    }
                    if (prepareStatement != null) {
                        if (0 != 0) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (prepareStatement != null) {
                        if (0 != 0) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                    throw th3;
                }
            } catch (Exception e) {
                LOG.error("Execute output to clickhouse failure!", e);
                try {
                    reconnect();
                } catch (SQLException e2) {
                    LOG.error("Re-connect clickhouse failure!", e2);
                }
            }
            this.lastCheckTime.set(System.currentTimeMillis());
            this.needBatchOutput.compareAndSet(true, false);
        }
    }

    private void reconnect() throws SQLException {
        if (this.conn != null) {
            try {
                this.conn.close();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
            this.conn = null;
        }
        this.conn = DriverManager.getConnection(this.chConfig.getUrl(), this.chConfig.getUsername(), this.chConfig.getPassword());
        this.conn.setAutoCommit(false);
    }

    @Override // org.apache.inlong.audit.service.InsertData
    public void insert(AuditData auditData) {
        ClickHouseDataPo clickHouseDataPo = new ClickHouseDataPo();
        clickHouseDataPo.setIp(auditData.getIp());
        clickHouseDataPo.setThreadId(auditData.getThreadId());
        clickHouseDataPo.setDockerId(auditData.getDockerId());
        clickHouseDataPo.setPacketId(Long.valueOf(auditData.getPacketId()));
        clickHouseDataPo.setSdkTs(new Timestamp(auditData.getSdkTs()));
        clickHouseDataPo.setLogTs(new Timestamp(auditData.getLogTs()));
        clickHouseDataPo.setAuditId(auditData.getAuditId());
        clickHouseDataPo.setAuditTag(auditData.getAuditTag());
        clickHouseDataPo.setCount(Long.valueOf(auditData.getCount()));
        clickHouseDataPo.setDelay(Long.valueOf(auditData.getDelay()));
        clickHouseDataPo.setInlongGroupId(auditData.getInlongGroupId());
        clickHouseDataPo.setInlongStreamId(auditData.getInlongStreamId());
        clickHouseDataPo.setSize(Long.valueOf(auditData.getSize()));
        clickHouseDataPo.setUpdateTime(new Timestamp(System.currentTimeMillis()));
        try {
            this.batchQueue.offer(clickHouseDataPo, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            if (this.batchCounter.incrementAndGet() >= this.chConfig.getBatchThreshold()) {
                this.needBatchOutput.compareAndSet(false, true);
            }
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.conn.close();
        this.timerService.shutdown();
    }
}
