/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.jdbc;

import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.jdbc.JdbcSinkConfig;
import org.apache.pulsar.io.jdbc.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JdbcAbstractSink<T>
implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(JdbcAbstractSink.class);
    private JdbcSinkConfig jdbcSinkConfig;
    private Connection connection;
    private String jdbcUrl;
    private String tableName;
    private JdbcUtils.TableId tableId;
    private PreparedStatement insertStatement;
    private PreparedStatement updateStatement;
    private PreparedStatement deleteStatement;
    protected static final String ACTION = "ACTION";
    protected static final String INSERT = "INSERT";
    protected static final String UPDATE = "UPDATE";
    protected static final String DELETE = "DELETE";
    protected JdbcUtils.TableDefinition tableDefinition;
    private List<Record<T>> incomingList;
    private List<Record<T>> swapList;
    private AtomicBoolean isFlushing;
    private int batchSize;
    private ScheduledExecutorService flushExecutor;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.jdbcSinkConfig = JdbcSinkConfig.load(config);
        this.jdbcUrl = this.jdbcSinkConfig.getJdbcUrl();
        if (this.jdbcSinkConfig.getJdbcUrl() == null) {
            throw new IllegalArgumentException("Required jdbc Url not set.");
        }
        Properties properties = new Properties();
        String username = this.jdbcSinkConfig.getUserName();
        String password = this.jdbcSinkConfig.getPassword();
        if (username != null) {
            properties.setProperty("user", username);
        }
        if (password != null) {
            properties.setProperty("password", password);
        }
        Class.forName(JdbcUtils.getDriverClassName(this.jdbcSinkConfig.getJdbcUrl()));
        this.connection = DriverManager.getConnection(this.jdbcSinkConfig.getJdbcUrl(), properties);
        this.connection.setAutoCommit(false);
        log.info("Opened jdbc connection: {}, autoCommit: {}", (Object)this.jdbcUrl, (Object)this.connection.getAutoCommit());
        this.tableName = this.jdbcSinkConfig.getTableName();
        this.tableId = JdbcUtils.getTableId(this.connection, this.tableName);
        this.initStatement();
        int timeoutMs = this.jdbcSinkConfig.getTimeoutMs();
        this.batchSize = this.jdbcSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.swapList = Lists.newArrayList();
        this.isFlushing = new AtomicBoolean(false);
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
    }

    private void initStatement() throws Exception {
        List<Object> keyList = Lists.newArrayList();
        String key = this.jdbcSinkConfig.getKey();
        if (key != null && !key.isEmpty()) {
            keyList = Arrays.asList(key.split(","));
        }
        List<Object> nonKeyList = Lists.newArrayList();
        String nonKey = this.jdbcSinkConfig.getNonKey();
        if (nonKey != null && !nonKey.isEmpty()) {
            nonKeyList = Arrays.asList(nonKey.split(","));
        }
        this.tableDefinition = JdbcUtils.getTableDefinition(this.connection, this.tableId, (List<String>)keyList, (List<String>)nonKeyList);
        this.insertStatement = JdbcUtils.buildInsertStatement(this.connection, JdbcUtils.buildInsertSql(this.tableDefinition));
        if (!nonKeyList.isEmpty()) {
            this.updateStatement = JdbcUtils.buildUpdateStatement(this.connection, JdbcUtils.buildUpdateSql(this.tableDefinition));
        }
        if (!keyList.isEmpty()) {
            this.deleteStatement = JdbcUtils.buildDeleteStatement(this.connection, JdbcUtils.buildDeleteSql(this.tableDefinition));
        }
    }

    public void close() throws Exception {
        if (!this.connection.getAutoCommit()) {
            this.connection.commit();
        }
        this.flushExecutor.shutdown();
        if (this.connection != null) {
            this.connection.close();
        }
        log.info("Closed jdbc connection: {}", (Object)this.jdbcUrl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<T> record) throws Exception {
        int number;
        JdbcAbstractSink jdbcAbstractSink = this;
        synchronized (jdbcAbstractSink) {
            this.incomingList.add(record);
            number = this.incomingList.size();
        }
        if (number == this.batchSize) {
            this.flushExecutor.schedule(this::flush, 0L, TimeUnit.MILLISECONDS);
        }
    }

    public abstract void bindValue(PreparedStatement var1, Record<T> var2, String var3) throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        if (this.incomingList.size() > 0 && this.isFlushing.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Starting flush, queue size: {}", (Object)this.incomingList.size());
            }
            if (!this.swapList.isEmpty()) {
                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + this.swapList.size());
            }
            JdbcAbstractSink jdbcAbstractSink = this;
            synchronized (jdbcAbstractSink) {
                this.swapList.clear();
                List<Record<T>> tmpList = this.swapList;
                this.swapList = this.incomingList;
                this.incomingList = tmpList;
            }
            int count = 0;
            try {
                block15: for (Record record : this.swapList) {
                    String action = (String)record.getProperties().get(ACTION);
                    if (action == null) {
                        action = INSERT;
                    }
                    switch (action) {
                        case "DELETE": {
                            this.bindValue(this.deleteStatement, record, action);
                            ++count;
                            this.deleteStatement.execute();
                            continue block15;
                        }
                        case "UPDATE": {
                            this.bindValue(this.updateStatement, record, action);
                            ++count;
                            this.updateStatement.execute();
                            continue block15;
                        }
                        case "INSERT": {
                            this.bindValue(this.insertStatement, record, action);
                            ++count;
                            this.insertStatement.execute();
                            continue block15;
                        }
                    }
                    String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s", action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
                    throw new IllegalArgumentException(msg);
                }
                this.connection.commit();
                this.swapList.forEach(Record::ack);
            }
            catch (Exception e) {
                log.error("Got exception ", (Throwable)e);
                this.swapList.forEach(Record::fail);
            }
            if (this.swapList.size() != count) {
                log.error("Update count {}  not match total number of records {}", (Object)count, (Object)this.swapList.size());
            }
            if (log.isDebugEnabled()) {
                log.debug("Finish flush, queue size: {}", (Object)this.swapList.size());
            }
            this.swapList.clear();
            this.isFlushing.set(false);
        } else if (log.isDebugEnabled()) {
            log.debug("Already in flushing state, will not flush, queue size: {}", (Object)this.incomingList.size());
        }
    }

    public Connection getConnection() {
        return this.connection;
    }
}

