package oracle.jdbc.txeventq.kafka.connect.source.utils;

import java.io.Closeable;
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;
import oracle.jdbc.aq.AQDequeueOptions;
import oracle.jdbc.aq.AQMessage;
import oracle.jdbc.internal.JMSDequeueOptions;
import oracle.jdbc.internal.JMSMessage;
import oracle.jdbc.internal.JMSMessageProperties;
import oracle.jdbc.txeventq.kafka.connect.source.utils.TxEventQSourceRecord;
import oracle.sql.RAW;
import oracle.sql.json.OracleJsonDatum;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/source/utils/TxEventQConsumer.class */
public class TxEventQConsumer implements Closeable {
    protected static final Logger log = LoggerFactory.getLogger(TxEventQConsumer.class);
    private TxEventQConnectorConfig config;
    private OracleConnection conn;

    public TxEventQConsumer(TxEventQConnectorConfig txEventQConnectorConfig) {
        this.config = null;
        this.config = txEventQConnectorConfig;
    }

    public OracleConnection connect() {
        try {
            log.info("[{}] Attempting to open database connection.", Long.valueOf(Thread.currentThread().getId()));
            System.setProperty("oracle.net.wallet_location", this.config.getString("wallet.path"));
            System.setProperty("oracle.net.tns_admin", this.config.getString("tnsnames.path"));
            DriverManager.registerDriver(new OracleDriver());
            this.conn = DriverManager.getConnection("jdbc:oracle:thin:@" + this.config.getString("db_tns_alias"));
            this.conn.setAutoCommit(false);
            log.info("[{}] Oracle TxEventQ connection [{}] opened.", Long.valueOf(Thread.currentThread().getId()), this.conn);
            return this.conn;
        } catch (SQLException e) {
            throw new ConnectException("Couldn't establish a connection to the database: " + e.toString());
        }
    }

    public boolean isConnOpen(OracleConnection oracleConnection) throws SQLException {
        return (oracleConnection == null || oracleConnection.isClosed()) ? false : true;
    }

    private TxEventQSourceRecord processRawPayload(byte[] bArr, AQMessage aQMessage) throws SQLException {
        RAW rAWPayload = aQMessage.getRAWPayload();
        String string = this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC);
        String byteArrayToHex = byteArrayToHex(bArr);
        int shardId = getShardId(byteArrayToHex);
        log.debug("[{}] Processing RAW Type message:[msgId: {}, shardNum: {}, bytes: {}]", new Object[]{Long.valueOf(Thread.currentThread().getId()), byteArrayToHex, Integer.valueOf(shardId), rAWPayload.getBytes()});
        return new TxEventQSourceRecord(null, null, string, Integer.valueOf(shardId / 2), null, rAWPayload.getBytes(), TxEventQSourceRecord.PayloadType.RAW, bArr);
    }

    private TxEventQSourceRecord processJsonPayload(byte[] bArr, AQMessage aQMessage) throws SQLException {
        OracleJsonDatum jSONPayload = aQMessage.getJSONPayload();
        String byteArrayToHex = byteArrayToHex(bArr);
        int shardId = getShardId(byteArrayToHex);
        String string = this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC);
        log.debug("[{}] Processing JSON Type message:[msgId: {}, shardNum: {}, bytes: {}]", new Object[]{Long.valueOf(Thread.currentThread().getId()), byteArrayToHex, Integer.valueOf(shardId), jSONPayload.getBytes()});
        return new TxEventQSourceRecord(null, null, string, Integer.valueOf(shardId / 2), null, jSONPayload.getBytes(), TxEventQSourceRecord.PayloadType.JSON, bArr);
    }

    public SourceRecord receive(OracleConnection oracleConnection) {
        String queueTableType = getQueueTableType(oracleConnection, this.config.getString("txeventq.queue.name").toUpperCase());
        log.debug("[{}]:[{}] Queue table {} is a {} type table.", new Object[]{Long.valueOf(Thread.currentThread().getId()), oracleConnection, this.config.getString("txeventq.queue.name"), queueTableType});
        if (queueTableType.equalsIgnoreCase("JMS_BYTES")) {
            return receiveJmsMessage(oracleConnection);
        }
        if (queueTableType.equalsIgnoreCase("RAW") || queueTableType.equalsIgnoreCase("JSON")) {
            return receiveAQMessage(oracleConnection, queueTableType);
        }
        log.error("Supported queue types are: JMS_BYTES, RAW, and JSON");
        return null;
    }

    public SourceRecord receiveAQMessage(OracleConnection oracleConnection, String str) {
        log.info("[{}] Waiting for RAW messages....", Long.valueOf(Thread.currentThread().getId()));
        AQDequeueOptions aQDequeueOptions = new AQDequeueOptions();
        aQDequeueOptions.setRetrieveMessageId(true);
        try {
            aQDequeueOptions.setConsumerName(this.config.getString(TxEventQConnectorConfig.TXEVENTQ_SUBSCRIBER_CONFIG));
            aQDequeueOptions.setDequeueMode(AQDequeueOptions.DequeueMode.REMOVE);
            aQDequeueOptions.setVisibility(AQDequeueOptions.VisibilityOption.ON_COMMIT);
            aQDequeueOptions.setNavigation(AQDequeueOptions.NavigationOption.NEXT_MESSAGE);
            aQDequeueOptions.setDeliveryFilter(AQDequeueOptions.DeliveryFilter.PERSISTENT);
            byte[] bArr = new byte[0];
            try {
                AQMessage dequeue = oracleConnection.dequeue(this.config.getString("txeventq.queue.name"), aQDequeueOptions, str);
                byte[] messageId = dequeue.getMessageId();
                if (messageId == null) {
                    log.error("[{}] Message ID is null.", Long.valueOf(Thread.currentThread().getId()));
                    return null;
                }
                if (str.equalsIgnoreCase("RAW")) {
                    return processRawPayload(messageId, dequeue);
                }
                if (str.equalsIgnoreCase("JSON")) {
                    return processJsonPayload(messageId, dequeue);
                }
                return null;
            } catch (SQLException e) {
                log.error("Error occurred while attempting to dequeue message.");
                return null;
            }
        } catch (SQLException e2) {
            log.error("Error setting AQDequeueOptions: {}", e2.toString());
            return null;
        }
    }

    private TxEventQSourceRecord processJmsBytes(byte[] bArr, JMSMessage jMSMessage) {
        byte[] payload = jMSMessage.getPayload();
        String string = this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC);
        String byteArrayToHex = byteArrayToHex(bArr);
        int shardId = getShardId(byteArrayToHex);
        log.debug("[{}] Processing JMS_BYTES message:[msgId: {}, shardNum: {}, bytes: {}]", new Object[]{Long.valueOf(Thread.currentThread().getId()), byteArrayToHex, Integer.valueOf(shardId), payload});
        return new TxEventQSourceRecord(null, null, string, Integer.valueOf(shardId / 2), null, payload, TxEventQSourceRecord.PayloadType.JMS, bArr);
    }

    private static int getShardId(String str) {
        if (str == null || str.length() != 32) {
            return -1;
        }
        String substring = str.substring(16, 24);
        if (str.substring(26, 28).equals("66")) {
            char[] charArray = substring.toCharArray();
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= charArray.length) {
                    break;
                }
                char c = charArray[i2];
                charArray[i2] = charArray[i2 + 1];
                charArray[i2 + 1] = c;
                i = i2 + 2;
            }
            substring = new StringBuilder(new String(charArray)).reverse().toString();
        }
        return Integer.parseInt(substring, 16);
    }

    public int getNumOfShardsForQueue(OracleConnection oracleConnection, String str) throws SQLException {
        log.debug("[{}]:[{}]: Called getNumOfShardsForQueue", Long.valueOf(Thread.currentThread().getId()), oracleConnection);
        CallableStatement prepareCall = oracleConnection.prepareCall("{call dbms_aqadm.get_queue_parameter(?,?,?)}");
        prepareCall.setString(1, str);
        prepareCall.setString(2, "SHARD_NUM");
        prepareCall.registerOutParameter(3, 4);
        prepareCall.execute();
        int i = prepareCall.getInt(3);
        prepareCall.close();
        log.debug("Number of shards for {}: {}", str, Integer.valueOf(i));
        return i;
    }

    public int getKafkaTopicPartitionSize(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.getList("bootstrap.servers"));
        int i = 0;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                i = ((TopicDescription) ((Map) create.describeTopics(Collections.singletonList(str)).allTopicNames().get()).get(str)).partitions().size();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            log.error("Unable to get Kafka partition size for topic {}: {}", str, e.toString());
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.error("Unable to get Kafka partition size for topic {}: {}", str, e2.toString());
        }
        return i;
    }

    public SourceRecord receiveJmsMessage(OracleConnection oracleConnection) {
        JMSDequeueOptions jMSDequeueOptions = new JMSDequeueOptions();
        jMSDequeueOptions.setConsumerName(this.config.getString(TxEventQConnectorConfig.TXEVENTQ_SUBSCRIBER_CONFIG));
        jMSDequeueOptions.setDequeueMode(JMSDequeueOptions.DequeueMode.REMOVE);
        jMSDequeueOptions.setWait(10);
        jMSDequeueOptions.setVisibility(JMSDequeueOptions.VisibilityOption.ON_COMMIT);
        jMSDequeueOptions.setRetrieveMessageId(true);
        jMSDequeueOptions.setDeliveryMode(JMSDequeueOptions.DeliveryFilter.PERSISTENT);
        try {
            JMSMessage jmsDequeue = ((oracle.jdbc.internal.OracleConnection) oracleConnection).jmsDequeue(this.config.getString("txeventq.queue.name"), jMSDequeueOptions);
            JMSMessageProperties jMSMessageProperties = jmsDequeue.getJMSMessageProperties();
            log.debug("[{}] JMSMessageProperties values: [Message Type = {}, Header Properties = {}, User Properties = {}]", new Object[]{Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(jMSMessageProperties.getJMSMessageType().getCode()), jMSMessageProperties.getHeaderProperties(), jMSMessageProperties.getUserProperties()});
            byte[] messageId = jmsDequeue.getMessageId();
            if (messageId != null) {
                return processJmsBytes(messageId, jmsDequeue);
            }
            log.error("[{}] Message ID is null.", Long.valueOf(Thread.currentThread().getId()));
            return null;
        } catch (SQLException e) {
            log.error("Error SQLException: {}", e.toString());
            return null;
        }
    }

    public String getQueueTableType(OracleConnection oracleConnection, String str) {
        try {
            PreparedStatement prepareStatement = oracleConnection.prepareStatement("SELECT type, queue_table from user_queue_tables where queue_table = ?");
            try {
                prepareStatement.setString(1, str);
                ResultSet executeQuery = prepareStatement.executeQuery();
                try {
                    if (!executeQuery.next()) {
                        if (executeQuery != null) {
                            executeQuery.close();
                        }
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        return null;
                    }
                    String string = executeQuery.getString("type");
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return string;
                } catch (Throwable th) {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectException("Error unable to get the queue table type: " + e.toString());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        log.info("[{}]:[{}] Close Oracle TxEventQ Connections.", Long.valueOf(Thread.currentThread().getId()), this.conn);
        try {
            this.conn.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private static String byteArrayToHex(byte[] bArr) {
        StringBuilder sb = new StringBuilder(bArr.length * 2);
        for (byte b : bArr) {
            sb.append(String.format("%02x", Byte.valueOf(b)));
        }
        return sb.toString();
    }
}
