package oracle.jdbc.txeventq.kafka.connect.sink.utils;

import java.io.Closeable;
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;
import oracle.jdbc.aq.AQFactory;
import oracle.jdbc.aq.AQMessageProperties;
import oracle.jdbc.internal.JMSEnqueueOptions;
import oracle.jdbc.internal.JMSFactory;
import oracle.jdbc.internal.JMSMessage;
import oracle.jdbc.internal.JMSMessageProperties;
import oracle.jdbc.txeventq.kafka.connect.common.utils.Constants;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsProducer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/sink/utils/TxEventQProducer.class */
public class TxEventQProducer implements Closeable {
    protected static final Logger log = LoggerFactory.getLogger(TxEventQProducer.class);
    private PreparedStatement preparedMergeStatement;
    private PreparedStatement preparedSelectOffsetStatement;
    private JMSMessageProperties jmsMesgProp;
    private OracleConnection conn;
    private TopicConnectionFactory tcf;
    private TopicConnection tconn;
    private TopicSession tSess;
    private Topic topic;
    private MessageProducer tProducer;
    private TxEventQSinkConfig config;
    private boolean isDatabaseRac;
    private static final long RECONNECT_DELAY_MILLIS_MIN = 64;
    private static final long RECONNECT_DELAY_MILLIS_MAX = 8192;
    private static final String TXEVENTQ$_TRACK_OFFSETS = "TXEVENTQ$_TRACK_OFFSETS";
    private static final int MINIMUM_VERSION = 21;
    private boolean connected = false;
    private String jmsDeliveryModeStr = "JMSDeliveryMode";
    private String persistentStr = "PERSISTENT";
    private String aqInternalPartitionStr = "AQINTERNAL_PARTITION";
    private int numberOfProperties = 1;
    private int stringPropertyValueType = 27;
    private int numberPropertyValueType = 24;
    private String partialUserPropertiesStr = this.numberOfProperties + "," + this.aqInternalPartitionStr.length() + "," + this.aqInternalPartitionStr + "," + this.numberPropertyValueType + ",";
    private String mergeSqlStatement = "MERGE INTO TXEVENTQ$_TRACK_OFFSETS tab1 USING (SELECT ? kafka_topic_name, ? queue_name, ? queue_schema, ? partition) tab2 ON (tab1.kafka_topic_name = tab2.kafka_topic_name AND tab1.queue_name = tab2.queue_name AND tab1.queue_schema = tab2.queue_schema AND tab1.partition = tab2.partition) WHEN MATCHED THEN UPDATE SET offset=? WHEN NOT MATCHED THEN INSERT (kafka_topic_name, queue_name, queue_schema, partition, offset) VALUES (?,?,?,?,?)";
    private String selectOffsetSqlStatement = "SELECT offset FROM TXEVENTQ$_TRACK_OFFSETS WHERE kafka_topic_name=? AND queue_name = ? AND queue_schema = ? AND partition=?";
    private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;

    public TxEventQProducer(TxEventQSinkConfig txEventQSinkConfig) {
        this.config = null;
        this.config = txEventQSinkConfig;
        try {
            this.jmsMesgProp = JMSFactory.createJMSMessageProperties();
            this.jmsMesgProp.setHeaderProperties(String.format("%1$d,%2$d,%3$s,%4$d,%5$d,%6$s", Integer.valueOf(this.numberOfProperties), Integer.valueOf(this.jmsDeliveryModeStr.length()), this.jmsDeliveryModeStr, Integer.valueOf(this.stringPropertyValueType), Integer.valueOf(this.persistentStr.length()), this.persistentStr));
            this.jmsMesgProp.setJMSMessageType(JMSMessageProperties.JMSMessageType.BYTES_MESSAGE);
        } catch (SQLException e) {
            throw new ConnectException("Unable to create JMS message properties: " + e.getMessage());
        }
    }

    public void connect() {
        log.trace("[{}] Entry {}.connect", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
        try {
            System.setProperty("oracle.net.wallet_location", this.config.getString("wallet.path"));
            System.setProperty("oracle.net.tns_admin", this.config.getString("tnsnames.path"));
            DriverManager.registerDriver(new OracleDriver());
            this.tcf = AQjmsFactory.getTopicConnectionFactory("jdbc:oracle:thin:@" + this.config.getString("db_tns_alias"), (Properties) null);
            this.tconn = this.tcf.createTopicConnection();
            this.tSess = this.tconn.createTopicSession(true, 2);
            this.conn = this.tSess.getDBConnection();
            this.tconn.start();
            versionCheck();
            this.topic = this.tSess.getTopic(this.conn.getUserName().toUpperCase(), this.config.getString("txeventq.queue.name").toUpperCase());
            this.tProducer = this.tSess.createProducer(this.topic);
            this.conn.setAutoCommit(false);
            this.preparedMergeStatement = this.conn.prepareStatement(this.mergeSqlStatement);
            this.preparedSelectOffsetStatement = this.conn.prepareStatement(this.selectOffsetSqlStatement);
            this.connected = true;
            this.isDatabaseRac = isClusterDatabase();
            log.debug("[{}:{}] Oracle TxEventQ connection opened!", Long.valueOf(Thread.currentThread().getId()), this.conn);
        } catch (SQLException | JMSException e) {
            log.debug("[{}] Connection to TxEventQ could not be established", Long.valueOf(Thread.currentThread().getId()));
            handleException(e);
        }
        log.trace("[{}]:[{}]  Exit {}.connect", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
    }

    private void connectConnectionInternal() {
        log.trace("[{}] Entry {}.connectConnectionInternal", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
        if (this.connected) {
            return;
        }
        try {
            System.setProperty("oracle.net.wallet_location", this.config.getString("wallet.path"));
            System.setProperty("oracle.net.tns_admin", this.config.getString("tnsnames.path"));
            DriverManager.registerDriver(new OracleDriver());
            this.tcf = AQjmsFactory.getTopicConnectionFactory("jdbc:oracle:thin:@" + this.config.getString("db_tns_alias"), (Properties) null);
            this.tconn = this.tcf.createTopicConnection();
            this.tSess = this.tconn.createTopicSession(true, 2);
            this.conn = this.tSess.getDBConnection();
            this.tconn.start();
            versionCheck();
            this.topic = this.tSess.getTopic(this.conn.getUserName().toUpperCase(), this.config.getString("txeventq.queue.name").toUpperCase());
            this.tProducer = this.tSess.createProducer(this.topic);
            this.conn.setAutoCommit(false);
            this.preparedMergeStatement = this.conn.prepareStatement(this.mergeSqlStatement);
            this.preparedSelectOffsetStatement = this.conn.prepareStatement(this.selectOffsetSqlStatement);
            this.reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
            this.connected = true;
            this.isDatabaseRac = isClusterDatabase();
            log.debug("[{}:{}] Oracle TxEventQ connection opened!", Long.valueOf(Thread.currentThread().getId()), this.conn);
            log.trace("[{}] Exit {}.connectConnectionInternal", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
        } catch (SQLException | JMSException e) {
            try {
                Thread.sleep(this.reconnectDelayMillis);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            if (this.reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
                this.reconnectDelayMillis *= 2;
            }
            log.trace("[{}]  Exit {}.connectConnectionInternal, retval=false", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
            throw handleException(e);
        }
    }

    private ConnectException handleException(Throwable th) {
        boolean z;
        log.trace("[{}]:[{}]  Entry {}.handleException", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        boolean z2 = true;
        switch (getErrorCode(th)) {
            case Constants.JMS_131 /* 131 */:
            case Constants.ORA_01033 /* 1033 */:
            case Constants.ORA_01034 /* 1034 */:
            case Constants.ORA_01089 /* 1089 */:
            case Constants.ORA_01109 /* 1109 */:
            case Constants.ORA_12541 /* 12541 */:
            case Constants.ORA_17002 /* 17002 */:
            case Constants.ORA_17008 /* 17008 */:
            case Constants.ORA_17868 /* 17868 */:
            case Constants.ORA_24221 /* 24221 */:
            case Constants.ORA_25348 /* 25348 */:
                z = true;
                break;
            case Constants.ORA_17410 /* 17410 */:
            case Constants.ORA_25228 /* 25228 */:
                z = true;
                z2 = false;
                break;
            default:
                z = false;
                z2 = false;
                break;
        }
        if (z2) {
            try {
                close();
            } catch (IOException e) {
                log.error("Unable to close connections.");
            }
        }
        if (z) {
            log.trace("[{}]:[{}]  Exit {}.handleException", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
            return new RetriableException(th);
        }
        log.trace("[{}]:[{}]  Exit {}.handleException", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        return new ConnectException(th);
    }

    private int getErrorCode(Throwable th) {
        int i = -1;
        if (th instanceof SQLException) {
            SQLException sQLException = (SQLException) th;
            log.error("{}:[{}] {}", new Object[]{sQLException.getClass().getName(), Integer.valueOf(sQLException.getErrorCode()), sQLException.getMessage()});
            i = sQLException.getErrorCode();
        } else if (th instanceof JMSException) {
            JMSException jMSException = (JMSException) th;
            Throwable cause = jMSException.getCause();
            if (cause == null) {
                log.error("{}:[{}] {}", new Object[]{jMSException.getClass().getName(), jMSException.getErrorCode(), jMSException.getMessage()});
                if (jMSException.getErrorCode() != null) {
                    i = Integer.parseInt(jMSException.getErrorCode());
                }
            } else if (cause instanceof SQLRecoverableException) {
                SQLRecoverableException sQLRecoverableException = (SQLRecoverableException) cause;
                log.error("{} caused by {}: [{}] {}", new Object[]{jMSException.getClass().getName(), sQLRecoverableException.getClass().getName(), Integer.valueOf(sQLRecoverableException.getErrorCode()), sQLRecoverableException.getMessage()});
                i = sQLRecoverableException.getErrorCode();
            } else if (cause instanceof SQLException) {
                SQLException sQLException2 = (SQLException) cause;
                log.error("{} caused by {}: [{}] {}", new Object[]{jMSException.getClass().getName(), sQLException2.getClass().getName(), Integer.valueOf(sQLException2.getErrorCode()), sQLException2.getMessage()});
                i = sQLException2.getErrorCode();
            }
        }
        return i;
    }

    private void versionCheck() {
        try {
            int databaseMajorVersion = this.conn.getMetaData().getDatabaseMajorVersion();
            log.debug("DB Version: {}", Integer.valueOf(databaseMajorVersion));
            if (databaseMajorVersion < MINIMUM_VERSION) {
                throw new ConnectException("TxEventQ Connector requires Oracle Database 21c or greater");
            }
        } catch (SQLException e) {
            throw new ConnectException("Unable to obtain a database connection");
        }
    }

    public OracleConnection getDatabaseConnection() {
        log.trace("[{}]:[{}]: Entry {}.getDatabaseConnection", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        return this.conn;
    }

    public int getKafkaTopicPartitionSize(String str) {
        log.trace("[{}]:[{}] Entry {}.getKafkaTopicPartitionSize,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.getList("bootstrap.servers"));
        int i = 0;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                i = ((TopicDescription) ((Map) create.describeTopics(Collections.singletonList(str)).allTopicNames().get()).get(str)).partitions().size();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            log.error("An InterruptedException occurred, unable to get Kafka partition size for topic {}: {}", str, e);
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            throw new ConnectException("Unable to get Kafka partition size: " + e2.getMessage());
        }
        log.trace("[{}]:[{}] Exit {}.getKafkaTopicPartitionSize,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        return i;
    }

    public boolean kafkaTopicExists(String str) {
        log.trace("[{}]:[{}] Entry {}.kafkaTopicExists,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.getList("bootstrap.servers"));
        Set set = null;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
                listTopicsOptions.listInternal(true);
                set = (Set) create.listTopics(listTopicsOptions).names().get();
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (InterruptedException e) {
            log.error("An InterruptedException occurred, unable to validate if Kafka topic {} exist: {}", str, e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.error("An ExecutionException occurred, unable to validate if Kafka topic {} exist: {}", str, e2);
        }
        log.trace("[{}]:[{}] Exit {}.kafkaTopicExists,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        return set != null && set.contains(str);
    }

    public boolean txEventQueueExists(String str) throws SQLException {
        log.trace("[{}]:[{}] Entry {}.txEventQueueExists,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        ResultSet tables = this.conn.getMetaData().getTables(null, null, str, new String[]{"TABLE"});
        try {
            log.trace("[{}] Exit {}.txEventQueueExists,", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
            boolean next = tables.next();
            if (tables != null) {
                tables.close();
            }
            return next;
        } catch (Throwable th) {
            if (tables != null) {
                try {
                    tables.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean isClusterDatabase() throws SQLException {
        log.trace("[{}]:[{}] Entry {}.isClusterDatabase,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        Statement createStatement = this.conn.createStatement();
        try {
            ResultSet executeQuery = createStatement.executeQuery("SELECT VALUE FROM V$PARAMETER WHERE UPPER(NAME) = 'CLUSTER_DATABASE'");
            try {
                executeQuery.next();
                boolean z = executeQuery.getBoolean("VALUE");
                if (executeQuery != null) {
                    executeQuery.close();
                }
                if (createStatement != null) {
                    createStatement.close();
                }
                log.trace("[{}]:[{}] Exit {}.isClusterDatabase, isRac=[{}]", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName(), Boolean.valueOf(z)});
                return z;
            } finally {
            }
        } catch (Throwable th) {
            if (createStatement != null) {
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public boolean createOffsetInfoTable() {
        log.trace("[{}]:[{}] Entry {}.createOffsetInfoTable,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        boolean z = false;
        try {
            PreparedStatement prepareStatement = this.conn.prepareStatement("CREATE TABLE IF NOT EXISTS TXEVENTQ$_TRACK_OFFSETS(kafka_topic_name varchar2(128) NOT NULL, queue_name varchar2(128) NOT NULL, queue_schema varchar2(128) NOT NULL, partition int NOT NULL, offset number NOT NULL, primary key(kafka_topic_name, queue_name, queue_schema, partition))");
            try {
                ResultSet executeQuery = prepareStatement.executeQuery();
                try {
                    ResultSet tables = this.conn.getMetaData().getTables(null, null, TXEVENTQ$_TRACK_OFFSETS, new String[]{"TABLE"});
                    if (tables.next()) {
                        log.debug("The TXEVENTQ$_TRACK_OFFSETS table successfully created.");
                        z = true;
                    }
                    tables.close();
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    log.trace("[{}]:[{}] Exit {}.createOffsetInfoTable,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
                    return z;
                } catch (Throwable th) {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw handleException(e);
        }
    }

    public int getNumOfShardsForQueue(String str) {
        log.trace("[{}]:[{}] Entry {}.getNumOfShardsForQueue,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        try {
            CallableStatement prepareCall = this.conn.prepareCall("{call dbms_aqadm.get_queue_parameter(?,?,?)}");
            try {
                prepareCall.setString(1, str);
                prepareCall.setString(2, "SHARD_NUM");
                prepareCall.registerOutParameter(3, 4);
                prepareCall.execute();
                int i = prepareCall.getInt(3);
                if (prepareCall != null) {
                    prepareCall.close();
                }
                log.trace("[{}]:[{}] Exit {}.getNumOfShardsForQueue,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
                return i;
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Error attempting to get number of shards for the specified queue: " + e.getMessage());
        }
    }

    public void enqueueMessage(String str, SinkRecord sinkRecord) throws SQLException {
        log.trace("[{}]:[{}] Entry {}.enqueueMessage,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        if (sinkRecord.kafkaPartition() != null) {
            this.jmsMesgProp.setUserProperties(this.partialUserPropertiesStr + ((2 * sinkRecord.kafkaPartition().intValue())).length() + "," + (2 * sinkRecord.kafkaPartition().intValue()));
        }
        JMSMessage createJMSMessage = JMSFactory.createJMSMessage(this.jmsMesgProp);
        if (sinkRecord.value() != null) {
            createJMSMessage.setPayload(sinkRecord.value().toString().getBytes());
        } else {
            createJMSMessage.setPayload((byte[]) null);
        }
        JMSEnqueueOptions jMSEnqueueOptions = new JMSEnqueueOptions();
        jMSEnqueueOptions.setRetrieveMessageId(true);
        jMSEnqueueOptions.setVisibility(JMSEnqueueOptions.VisibilityOption.ON_COMMIT);
        jMSEnqueueOptions.setDeliveryMode(JMSEnqueueOptions.DeliveryMode.PERSISTENT);
        AQMessageProperties createAQMessageProperties = AQFactory.createAQMessageProperties();
        createAQMessageProperties.setPriority(4);
        if (sinkRecord.key() != null) {
            createAQMessageProperties.setCorrelation(sinkRecord.key().toString());
        }
        createJMSMessage.setAQMessageProperties(createAQMessageProperties);
        this.conn.jmsEnqueue(str, jMSEnqueueOptions, createJMSMessage, createAQMessageProperties);
        log.trace("[{}]:[{}] Exit {}.enqueueMessage,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
    }

    public void enqueueBulkMessage(String str, Collection<SinkRecord> collection, MessageProducer messageProducer) throws JMSException {
        ArrayList arrayList = new ArrayList();
        int[] iArr = new int[collection.size()];
        int[] iArr2 = new int[collection.size()];
        log.trace("[{}]:[{}] Entry {}.enqueueBulkMessage,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        connectConnectionInternal();
        int i = 0;
        for (SinkRecord sinkRecord : collection) {
            log.debug("[{}:{}] Enqueuing record from partition {} at offset {} with timestamp of {}.", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()), sinkRecord.timestamp()});
            AQjmsBytesMessage createBytesMessage = createBytesMessage(this.tSess, sinkRecord);
            createBytesMessage.setJMSDeliveryMode(2);
            createBytesMessage.setJMSPriority(4);
            iArr[i] = createBytesMessage.getJMSDeliveryMode();
            iArr2[i] = createBytesMessage.getJMSPriority();
            arrayList.add(createBytesMessage);
            i++;
        }
        log.debug("Total number of messages to enqueue: {}", Integer.valueOf(arrayList.size()));
        ((AQjmsProducer) messageProducer).bulkSend((AQjmsBytesMessage[]) arrayList.toArray(new AQjmsBytesMessage[0]), iArr, iArr2, (long[]) null);
        log.trace("[{}]:[{}] Exit {}.enqueueBulkMessage,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
    }

    private AQjmsBytesMessage createBytesMessage(TopicSession topicSession, SinkRecord sinkRecord) throws JMSException {
        AQjmsBytesMessage createBytesMessage = topicSession.createBytesMessage();
        if (sinkRecord.value() != null) {
            createBytesMessage.writeBytes(sinkRecord.value().toString().getBytes());
        } else {
            createBytesMessage.writeBytes((byte[]) null);
        }
        if (sinkRecord.key() != null) {
            createBytesMessage.setJMSCorrelationID(sinkRecord.key().toString());
        }
        createBytesMessage.setStringProperty("AQINTERNAL_PARTITION", Integer.toString(sinkRecord.kafkaPartition().intValue() * 2));
        return createBytesMessage;
    }

    public void put(Collection<SinkRecord> collection) {
        log.trace("[{}]:[{}] Entry {}.put,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        connectConnectionInternal();
        try {
            if (!this.isDatabaseRac) {
                log.debug("Performing bulk enqueue because not RAC database");
                enqueueBulkMessage(this.config.getString("txeventq.queue.name"), collection, this.tProducer);
            }
            HashMap hashMap = new HashMap();
            for (SinkRecord sinkRecord : collection) {
                if (this.isDatabaseRac) {
                    log.debug("Performing single enqueue because RAC database");
                    enqueueMessage(this.config.getString("txeventq.queue.name"), sinkRecord);
                }
                hashMap.computeIfPresent(sinkRecord.topic(), (str, map) -> {
                    map.computeIfPresent(sinkRecord.kafkaPartition(), (num, l) -> {
                        return Long.valueOf(sinkRecord.kafkaOffset());
                    });
                    return map;
                });
                ((Map) hashMap.computeIfAbsent(sinkRecord.topic(), str2 -> {
                    return new HashMap();
                })).put(sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                String str3 = (String) entry.getKey();
                for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                    setOffsetInfoInDatabase(this.preparedMergeStatement, str3, this.config.getString("txeventq.queue.name"), this.config.getString("txeventq.queue.schema"), ((Integer) entry2.getKey()).intValue(), ((Long) entry2.getValue()).longValue());
                }
            }
            this.conn.commit();
            log.trace("[{}]:[{}] Exit {}.put,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        } catch (SQLException | JMSException e) {
            throw handleException(e);
        }
    }

    private void setOffsetInfoInDatabase(PreparedStatement preparedStatement, String str, String str2, String str3, int i, long j) throws SQLException {
        log.trace("[{}]:[{}] Entry {}.setOffsetInfoInDatabase,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        preparedStatement.setString(1, str);
        preparedStatement.setString(2, str2);
        preparedStatement.setString(3, str3);
        preparedStatement.setInt(4, i);
        preparedStatement.setLong(5, j + 1);
        preparedStatement.setString(6, str);
        preparedStatement.setString(7, str2);
        preparedStatement.setString(8, str3);
        preparedStatement.setInt(9, i);
        preparedStatement.setLong(10, j + 1);
        preparedStatement.execute();
        log.trace("[{}]:[{}] Exit {}.setOffsetInfoInDatabase,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
    }

    public long getOffsetInDatabase(String str, String str2, String str3, int i) {
        log.trace("[{}]:[{}] Entry {}.getOffsetInDatabase,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
        long j = 0;
        try {
            this.preparedSelectOffsetStatement.setString(1, str);
            this.preparedSelectOffsetStatement.setString(2, str2);
            this.preparedSelectOffsetStatement.setString(3, str3);
            this.preparedSelectOffsetStatement.setInt(4, i);
            ResultSet executeQuery = this.preparedSelectOffsetStatement.executeQuery();
            try {
                if (executeQuery.next()) {
                    j = executeQuery.getLong("offset");
                }
                if (executeQuery != null) {
                    executeQuery.close();
                }
                log.trace("[{}]:[{}] Exit {}.getOffsetInDatabase,", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, getClass().getName()});
                return j;
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Error getting the offset value: " + e.getMessage());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        log.trace("[{}] Entry {}.close,", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
        try {
            if (this.tSess != null) {
                this.tSess.rollback();
            }
        } catch (JMSException e) {
            log.error("{}: {}", e.getClass().getName(), e);
        }
        try {
            if (this.preparedMergeStatement != null) {
                log.debug("preparedMergeStatement will be closed.");
                this.preparedMergeStatement.close();
            }
        } catch (SQLException e2) {
            log.error("{}: {}", e2.getClass().getName(), e2);
        }
        try {
            if (this.preparedSelectOffsetStatement != null) {
                log.debug("preparedSelectOffsetStatement will be closed.");
                this.preparedSelectOffsetStatement.close();
            }
        } catch (SQLException e3) {
            log.error("{}: {}", e3.getClass().getName(), e3);
        }
        try {
            this.connected = false;
            if (this.tSess != null) {
                log.debug("Session will be closed.");
                this.tSess.close();
            }
            if (this.conn != null) {
                log.debug("Connection will be closed.");
                this.conn.close();
            }
            if (this.tconn != null) {
                log.debug("Topic Connection will be closed.");
                this.tconn.close();
            }
        } catch (SQLException | JMSException e4) {
            log.error("{}: {}", e4.getClass().getName(), e4);
        } finally {
            this.conn = null;
            this.tSess = null;
            this.tconn = null;
            log.debug("Connection to TxEventQ closed.");
        }
        log.trace("[{}] Exit {}.close,", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
    }
}
