package org.apache.pulsar.io.jdbc;

import com.google.common.collect.Lists;
import com.mysql.cj.conf.PropertyDefinitions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.jdbc.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/jdbc/JdbcAbstractSink.class */
public abstract class JdbcAbstractSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) JdbcAbstractSink.class);
    private JdbcSinkConfig jdbcSinkConfig;
    private Connection connection;
    private String jdbcUrl;
    private String tableName;
    private JdbcUtils.TableId tableId;
    private PreparedStatement insertStatement;
    protected JdbcUtils.TableDefinition tableDefinition;
    private List<Record<T>> incomingList;
    private List<Record<T>> swapList;
    private AtomicBoolean isFlushing;
    private int timeoutMs;
    private int batchSize;
    private ScheduledExecutorService flushExecutor;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.jdbcSinkConfig = JdbcSinkConfig.load(map);
        this.jdbcUrl = this.jdbcSinkConfig.getJdbcUrl();
        if (this.jdbcSinkConfig.getJdbcUrl() == null) {
            throw new IllegalArgumentException("Required jdbc Url not set.");
        }
        Properties properties = new Properties();
        String userName = this.jdbcSinkConfig.getUserName();
        String password = this.jdbcSinkConfig.getPassword();
        if (userName != null) {
            properties.setProperty(PropertyDefinitions.PNAME_user, userName);
        }
        if (password != null) {
            properties.setProperty(PropertyDefinitions.PNAME_password, password);
        }
        this.connection = JdbcUtils.getConnection(this.jdbcUrl, properties);
        this.connection.setAutoCommit(false);
        log.info("Opened jdbc connection: {}, autoCommit: {}", this.jdbcUrl, Boolean.valueOf(this.connection.getAutoCommit()));
        this.tableName = this.jdbcSinkConfig.getTableName();
        this.tableId = JdbcUtils.getTableId(this.connection, this.tableName);
        this.tableDefinition = JdbcUtils.getTableDefinition(this.connection, this.tableId);
        this.insertStatement = JdbcUtils.buildInsertStatement(this.connection, JdbcUtils.buildInsertSql(this.tableDefinition));
        this.timeoutMs = this.jdbcSinkConfig.getTimeoutMs();
        this.batchSize = this.jdbcSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.swapList = Lists.newArrayList();
        this.isFlushing = new AtomicBoolean(false);
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> {
            flush();
        }, this.timeoutMs, this.timeoutMs, TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.connection.getAutoCommit()) {
            this.connection.commit();
        }
        this.flushExecutor.shutdown();
        if (this.connection != null) {
            this.connection.close();
        }
        log.info("Closed jdbc connection: {}", this.jdbcUrl);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) throws Exception {
        int size;
        synchronized (this.incomingList) {
            this.incomingList.add(record);
            size = this.incomingList.size();
        }
        if (size == this.batchSize) {
            this.flushExecutor.schedule(() -> {
                flush();
            }, 0L, TimeUnit.MILLISECONDS);
        }
    }

    public abstract void bindValue(PreparedStatement preparedStatement, Record<T> record) throws Exception;

    private void flush() {
        if (this.incomingList.size() <= 0 || !this.isFlushing.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Already in flushing state, will not flush, queue size: {}", Integer.valueOf(this.incomingList.size()));
                return;
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Starting flush, queue size: {}", Integer.valueOf(this.incomingList.size()));
        }
        if (!this.swapList.isEmpty()) {
            throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + this.swapList.size());
        }
        synchronized (this.incomingList) {
            this.swapList.clear();
            List<Record<T>> list = this.swapList;
            this.swapList = this.incomingList;
            this.incomingList = list;
        }
        int i = 0;
        try {
            for (Record<T> record : this.swapList) {
                bindValue(this.insertStatement, record);
                this.insertStatement.addBatch();
                record.ack();
            }
            for (int i2 : this.insertStatement.executeBatch()) {
                if (i2 != -2) {
                    i += i;
                }
            }
            this.connection.commit();
            this.swapList.forEach(record2 -> {
                record2.ack();
            });
        } catch (Exception e) {
            log.error("Got exception ", (Throwable) e);
            this.swapList.forEach(record3 -> {
                record3.fail();
            });
        }
        if (this.swapList.size() != i) {
            log.error("Update count {}  not match total number of records {}", Integer.valueOf(i), Integer.valueOf(this.swapList.size()));
        }
        if (log.isDebugEnabled()) {
            log.debug("Finish flush, queue size: {}", Integer.valueOf(this.swapList.size()));
        }
        this.isFlushing.set(false);
    }

    public Connection getConnection() {
        return this.connection;
    }
}
