package oracle.jdbc.txeventq.kafka.connect.sink.utils;

import java.io.Closeable;
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;
import oracle.jdbc.aq.AQFactory;
import oracle.jdbc.aq.AQMessageProperties;
import oracle.jdbc.internal.JMSEnqueueOptions;
import oracle.jdbc.internal.JMSFactory;
import oracle.jdbc.internal.JMSMessage;
import oracle.jdbc.internal.JMSMessageProperties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/sink/utils/TxEventQProducer.class */
public class TxEventQProducer implements Closeable {
    protected static final Logger log = LoggerFactory.getLogger(TxEventQProducer.class);
    private PreparedStatement preparedMergeStatement;
    private PreparedStatement preparedSelectOffsetStatement;
    private OracleConnection conn;
    private TxEventQSinkConfig config;
    private static final String TXEVENTQ$_TRACK_OFFSETS = "TXEVENTQ$_TRACK_OFFSETS";
    private String jmsDeliveryModeStr = "JMSDeliveryMode";
    private String persistentStr = "PERSISTENT";
    private String aqInternalPartitionStr = "AQINTERNAL_PARTITION";
    private int numberOfProperties = 1;
    private int stringPropertyValueType = 27;
    private int numberPropertyValueType = 24;
    private String partialUserPropertiesStr = this.numberOfProperties + "," + this.aqInternalPartitionStr.length() + "," + this.aqInternalPartitionStr + "," + this.numberPropertyValueType + ",";
    private String mergeSqlStatement = "MERGE INTO TXEVENTQ$_TRACK_OFFSETS tab1 USING (SELECT ? kafka_topic_name, ? queue_name, ? queue_schema, ? partition) tab2 ON (tab1.kafka_topic_name = tab2.kafka_topic_name AND tab1.queue_name = tab2.queue_name AND tab1.queue_schema = tab2.queue_schema AND tab1.partition = tab2.partition) WHEN MATCHED THEN UPDATE set offset=? WHEN NOT MATCHED THEN INSERT (kafka_topic_name, queue_name, queue_schema, partition, offset) values (?,?,?,?,?)";
    private String selectOffsetSqlStatement = "SELECT offset FROM TXEVENTQ$_TRACK_OFFSETS where kafka_topic_name=? and queue_name = ? and queue_schema = ? and partition=?";
    private JMSMessageProperties jmsMesgProp = JMSFactory.createJMSMessageProperties();

    public TxEventQProducer(TxEventQSinkConfig txEventQSinkConfig) throws SQLException {
        this.config = null;
        this.config = txEventQSinkConfig;
        this.jmsMesgProp.setHeaderProperties(this.numberOfProperties + "," + this.jmsDeliveryModeStr.length() + "," + this.jmsDeliveryModeStr + "," + this.stringPropertyValueType + "," + this.persistentStr.length() + "," + this.persistentStr);
        this.jmsMesgProp.setJMSMessageType(JMSMessageProperties.JMSMessageType.BYTES_MESSAGE);
    }

    public OracleConnection connect() {
        try {
            System.setProperty("oracle.net.wallet_location", this.config.getString("wallet.path"));
            System.setProperty("oracle.net.tns_admin", this.config.getString("tnsnames.path"));
            DriverManager.registerDriver(new OracleDriver());
            this.conn = DriverManager.getConnection("jdbc:oracle:thin:@" + this.config.getString("db_tns_alias"));
            this.conn.setAutoCommit(false);
            this.preparedMergeStatement = this.conn.prepareStatement(this.mergeSqlStatement);
            this.preparedSelectOffsetStatement = this.conn.prepareStatement(this.selectOffsetSqlStatement);
            log.info("[{}:{}] Oracle TxEventQ connection opened!", Long.valueOf(Thread.currentThread().getId()), this.conn);
            return this.conn;
        } catch (SQLException e) {
            throw new ConnectException("Couldn't establish a connection to the database: " + e.toString());
        }
    }

    public int getKafkaTopicPartitionSize(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.getList("bootstrap.servers"));
        int i = 0;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                i = ((TopicDescription) ((Map) create.describeTopics(Collections.singletonList(str)).allTopicNames().get()).get(str)).partitions().size();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            log.error("Unable to get Kafka partition size for topic {}: {}", str, e.toString());
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.error("Unable to get Kafka partition size for topic {}: {}", str, e2.toString());
        }
        return i;
    }

    public boolean kafkaTopicExists(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.getList("bootstrap.servers"));
        Set set = null;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
                listTopicsOptions.listInternal(true);
                set = (Set) create.listTopics(listTopicsOptions).names().get();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            log.error("Unable to validate if Kafka topic {} exist: {}", str, e.toString());
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.error("Unable to validate if Kafka topic {} exist: {}", str, e2.toString());
        }
        return set != null && set.contains(str);
    }

    public boolean txEventQueueExists(OracleConnection oracleConnection, String str) throws SQLException {
        return this.conn.getMetaData().getTables(null, null, str, new String[]{"TABLE"}).next();
    }

    public boolean createOffsetInfoTable(OracleConnection oracleConnection) {
        boolean z = false;
        try {
            PreparedStatement prepareStatement = oracleConnection.prepareStatement("Create table if not exists TXEVENTQ$_TRACK_OFFSETS(kafka_topic_name varchar2(128) NOT NULL, queue_name varchar2(128) NOT NULL, queue_schema varchar2(128) NOT NULL, partition int NOT NULL, offset number NOT NULL, primary key(kafka_topic_name, queue_name, queue_schema, partition))");
            try {
                ResultSet executeQuery = prepareStatement.executeQuery();
                try {
                    if (oracleConnection.getMetaData().getTables(null, null, TXEVENTQ$_TRACK_OFFSETS, new String[]{"TABLE"}).next()) {
                        log.info("The TXEVENTQ$_TRACK_OFFSETS table successfully created.");
                        z = true;
                    }
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return z;
                } catch (Throwable th) {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Error attempting to create TXEVENTQ$_TRACK_OFFSETS table: " + e.toString());
        }
    }

    public int getNumOfShardsForQueue(OracleConnection oracleConnection, String str) throws SQLException {
        CallableStatement prepareCall = oracleConnection.prepareCall("{call dbms_aqadm.get_queue_parameter(?,?,?)}");
        prepareCall.setString(1, str);
        prepareCall.setString(2, "SHARD_NUM");
        prepareCall.registerOutParameter(3, 4);
        prepareCall.execute();
        int i = prepareCall.getInt(3);
        prepareCall.close();
        return i;
    }

    public void enqueueMessage(OracleConnection oracleConnection, String str, SinkRecord sinkRecord) throws SQLException {
        if (sinkRecord.kafkaPartition() != null) {
            this.jmsMesgProp.setUserProperties(this.partialUserPropertiesStr + ((2 * sinkRecord.kafkaPartition().intValue())).length() + "," + (2 * sinkRecord.kafkaPartition().intValue()));
        }
        JMSMessage createJMSMessage = JMSFactory.createJMSMessage(this.jmsMesgProp);
        if (sinkRecord.value() != null) {
            createJMSMessage.setPayload(sinkRecord.value().toString().getBytes());
        } else {
            createJMSMessage.setPayload((byte[]) null);
        }
        JMSEnqueueOptions jMSEnqueueOptions = new JMSEnqueueOptions();
        jMSEnqueueOptions.setRetrieveMessageId(true);
        jMSEnqueueOptions.setVisibility(JMSEnqueueOptions.VisibilityOption.ON_COMMIT);
        jMSEnqueueOptions.setDeliveryMode(JMSEnqueueOptions.DeliveryMode.PERSISTENT);
        AQMessageProperties createAQMessageProperties = AQFactory.createAQMessageProperties();
        createAQMessageProperties.setPriority(4);
        if (sinkRecord.key() != null) {
            createAQMessageProperties.setCorrelation(sinkRecord.key().toString());
        }
        createJMSMessage.setAQMessageProperties(createAQMessageProperties);
        ((oracle.jdbc.internal.OracleConnection) oracleConnection).jmsEnqueue(str, jMSEnqueueOptions, createJMSMessage, createAQMessageProperties);
    }

    public void put(Collection<SinkRecord> collection) {
        try {
            HashMap hashMap = new HashMap();
            for (SinkRecord sinkRecord : collection) {
                log.debug("[{}:{}] Enqueuing record from partition {} at offset {} with timestamp of {}.", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()), sinkRecord.timestamp()});
                enqueueMessage(this.conn, this.config.getString("txeventq.queue.name"), sinkRecord);
                if (hashMap.containsKey(sinkRecord.topic())) {
                    Map map = (Map) hashMap.get(sinkRecord.topic());
                    if (map.containsKey(sinkRecord.kafkaPartition())) {
                        map.replace(sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
                    } else {
                        map.put(sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
                    }
                } else {
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
                    hashMap.put(sinkRecord.topic(), hashMap2);
                }
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                String str = (String) entry.getKey();
                for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                    setOffsetInfoInDatabase(this.conn, this.preparedMergeStatement, str, this.config.getString("txeventq.queue.name"), this.config.getString("txeventq.queue.schema"), ((Integer) entry2.getKey()).intValue(), ((Long) entry2.getValue()).longValue());
                }
            }
            this.conn.commit();
        } catch (SQLException e) {
            throw new ConnectException("Error putting records into TxEventQ: " + e.toString());
        }
    }

    private void setOffsetInfoInDatabase(OracleConnection oracleConnection, PreparedStatement preparedStatement, String str, String str2, String str3, int i, long j) {
        try {
            preparedStatement.setString(1, str);
            preparedStatement.setString(2, str2);
            preparedStatement.setString(3, str3);
            preparedStatement.setInt(4, i);
            preparedStatement.setLong(5, j + 1);
            preparedStatement.setString(6, str);
            preparedStatement.setString(7, str2);
            preparedStatement.setString(8, str3);
            preparedStatement.setInt(9, i);
            preparedStatement.setLong(10, j + 1);
            preparedStatement.execute();
        } catch (SQLException e) {
            throw new ConnectException("Error attempting to insert or update offset information: " + e.toString());
        }
    }

    public long getOffsetInDatabase(OracleConnection oracleConnection, String str, String str2, String str3, int i) {
        long j = 0;
        try {
            this.preparedSelectOffsetStatement.setString(1, str);
            this.preparedSelectOffsetStatement.setString(2, str2);
            this.preparedSelectOffsetStatement.setString(3, str3);
            this.preparedSelectOffsetStatement.setInt(4, i);
            ResultSet executeQuery = this.preparedSelectOffsetStatement.executeQuery();
            try {
                if (executeQuery.next()) {
                    j = executeQuery.getLong("offset");
                }
                if (executeQuery != null) {
                    executeQuery.close();
                }
                return j;
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Error getting the offset value: " + e.toString());
        }
    }

    public OracleConnection getConnection() {
        return this.conn;
    }

    public boolean isConnOpen(OracleConnection oracleConnection) throws SQLException {
        return (oracleConnection == null || oracleConnection.isClosed()) ? false : true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        log.info("[{}]:[{}] Close Oracle TxEventQ Connections.", Long.valueOf(Thread.currentThread().getId()), this.conn);
        try {
            this.preparedMergeStatement.close();
            this.preparedSelectOffsetStatement.close();
            this.conn.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}
