package org.oracle.okafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import oracle.jdbc.internal.OracleConnection;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsProducer;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.oracle.okafka.clients.Metadata;
import org.oracle.okafka.clients.NetworkClient;
import org.oracle.okafka.clients.producer.ProducerConfig;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.network.AQClient;
import org.oracle.okafka.common.protocol.ApiKeys;
import org.oracle.okafka.common.requests.ProduceRequest;
import org.oracle.okafka.common.requests.ProduceResponse;
import org.oracle.okafka.common.utils.ConnectionUtils;
import org.oracle.okafka.common.utils.MessageIdConverter;

/* loaded from: input_file:org/oracle/okafka/clients/producer/internals/AQKafkaProducer.class */
public final class AQKafkaProducer extends AQClient {
    private final Map<Node, TopicPublishers> topicPublishersMap;
    private final ProducerConfig configs;
    private final Time time;
    private Metadata metadata;
    private final int DLENGTH_SIZE = 4;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/oracle/okafka/clients/producer/internals/AQKafkaProducer$TopicPublishers.class */
    public final class TopicPublishers {
        private Node node;
        private TopicConnection conn;
        private TopicSession sess;
        private Map<String, TopicPublisher> topicPublishers;
        private int sessionAckMode;
        private boolean isAlive;
        PreparedStatement pingStmt;
        private final String PING_QUERY = "SELECT banner FROM v$version where 1<>1";

        public TopicPublishers(AQKafkaProducer aQKafkaProducer, Node node) throws JMSException {
            this(node, 1);
        }

        public TopicPublishers(Node node, int i) throws JMSException {
            this.topicPublishers = null;
            this.sessionAckMode = 1;
            this.isAlive = false;
            this.pingStmt = null;
            this.PING_QUERY = "SELECT banner FROM v$version where 1<>1";
            this.node = node;
            this.sessionAckMode = i;
            createPublishers();
            try {
                OracleConnection dBConnection = this.sess.getDBConnection();
                int parseInt = Integer.parseInt(dBConnection.getServerSessionInfo().getProperty("AUTH_INSTANCE_NO"));
                String property = dBConnection.getServerSessionInfo().getProperty("SERVICE_NAME");
                String property2 = dBConnection.getServerSessionInfo().getProperty("INSTANCE_NAME");
                String userName = dBConnection.getMetaData().getUserName();
                try {
                    AQKafkaProducer.this.log.info("Database Producer Session Info: " + dBConnection.getServerSessionInfo().getProperty("AUTH_SESSION_ID") + "," + dBConnection.getServerSessionInfo().getProperty("AUTH_SERIAL_NUM") + ". Process Id " + dBConnection.getServerSessionInfo().getProperty("AUTH_SERVER_PID") + " Instance Name " + property2);
                } catch (Exception e) {
                }
                this.node.setId(parseInt);
                this.node.setService(property);
                this.node.setInstanceName(property2);
                this.node.setUser(userName);
                this.node.updateHashCode();
                this.pingStmt = dBConnection.prepareStatement("SELECT banner FROM v$version where 1<>1");
                this.pingStmt.setQueryTimeout(1);
                this.isAlive = true;
            } catch (Exception e2) {
                AQKafkaProducer.this.log.error("Exception while getting instance id from conneciton " + e2, e2);
            }
            this.topicPublishers = new HashMap();
        }

        private boolean createPublishers() throws JMSException {
            this.conn = createTopicConnection(this.node);
            this.sess = createTopicSession(this.sessionAckMode);
            return true;
        }

        public TopicConnection createTopicConnection(Node node) throws JMSException {
            this.conn = ConnectionUtils.createTopicConnection(node, AQKafkaProducer.this.configs, AQKafkaProducer.this.log);
            return this.conn;
        }

        public TopicPublisher getTopicPublisher(String str) throws JMSException {
            TopicPublisher topicPublisher = this.topicPublishers.get(str);
            if (topicPublisher == null) {
                topicPublisher = createTopicPublisher(str);
                this.topicPublishers.put(str, topicPublisher);
            }
            return topicPublisher;
        }

        public TopicSession createTopicSession(int i) throws JMSException {
            if (this.sess != null) {
                return this.sess;
            }
            this.sess = ConnectionUtils.createTopicSession(this.conn, i, false);
            this.conn.start();
            return this.sess;
        }

        private TopicPublisher createTopicPublisher(String str) throws JMSException {
            return this.sess.createPublisher(this.sess.getTopic((this.node == null || this.node.user() == null) ? ConnectionUtils.getUsername(AQKafkaProducer.this.configs) : this.node.user(), str));
        }

        public TopicConnection getConnection() {
            return this.conn;
        }

        public TopicSession getSession() {
            return this.sess;
        }

        public Map<String, TopicPublisher> getTopicPublisherMap() {
            return this.topicPublishers;
        }

        public boolean isConnected() {
            if (this.isAlive) {
                try {
                    this.pingStmt.executeQuery();
                } catch (Exception e) {
                    AQKafkaProducer.this.log.error("Publishers to node {} Failed to connect.", this.node.toString());
                    this.isAlive = false;
                }
            }
            return this.isAlive;
        }

        public boolean reCreate() throws JMSException {
            close();
            createPublishers();
            try {
                HashMap hashMap = new HashMap();
                for (String str : this.topicPublishers.keySet()) {
                    try {
                        hashMap.put(str, createTopicPublisher(str));
                    } catch (Exception e) {
                        AQKafkaProducer.this.log.error("Exception " + e + " while re-creating publishers for topic " + str + " for node" + this.node);
                    }
                }
                this.topicPublishers.clear();
                this.topicPublishers = hashMap;
            } catch (Exception e2) {
                AQKafkaProducer.this.log.error("Exception " + e2 + " while re-creating publishers for topic for node" + this.node);
            }
            this.isAlive = true;
            return this.isAlive;
        }

        public void close() {
            try {
                if (this.pingStmt != null) {
                    this.pingStmt.close();
                }
                if (this.sess != null) {
                    this.sess.close();
                }
                if (this.conn != null) {
                    this.conn.close();
                }
                this.isAlive = false;
            } catch (Exception e) {
                this.isAlive = false;
                AQKafkaProducer.this.log.error("Failed to Close publishers for node " + this.node);
            }
        }
    }

    public AQKafkaProducer(LogContext logContext, ProducerConfig producerConfig, Time time, Metadata metadata) {
        super(logContext.logger(AQKafkaProducer.class), producerConfig);
        this.DLENGTH_SIZE = 4;
        this.configs = producerConfig;
        this.time = time;
        this.topicPublishersMap = new HashMap();
        this.metadata = metadata;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void connect(Node node) throws JMSException {
        TopicPublishers topicPublishers = null;
        try {
            this.log.debug("Creating new connection for node " + node);
            topicPublishers = new TopicPublishers(this, node);
            this.topicPublishersMap.put(node, topicPublishers);
            this.log.debug("CONNECTED NODES: " + this.topicPublishersMap.keySet());
        } catch (JMSException e) {
            close(node, topicPublishers);
            throw e;
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public boolean isChannelReady(Node node) {
        return this.topicPublishersMap.containsKey(node);
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public ClientResponse send(ClientRequest clientRequest) {
        return parseRequest(clientRequest, ApiKeys.convertToOracleApiKey(clientRequest.apiKey()));
    }

    private ClientResponse parseRequest(ClientRequest clientRequest, ApiKeys apiKeys) {
        if (apiKeys == ApiKeys.PRODUCE) {
            return publish(clientRequest);
        }
        if (apiKeys == ApiKeys.METADATA) {
            return getMetadata(clientRequest);
        }
        return null;
    }

    private ClientResponse publish(ClientRequest clientRequest) {
        boolean z;
        ProduceRequest m18build = ((ProduceRequest.Builder) clientRequest.requestBuilder()).m18build();
        Node nodeById = this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()));
        TopicPartition topicpartition = m18build.getTopicpartition();
        MemoryRecords memoryRecords = m18build.getMemoryRecords();
        TopicPublishers topicPublishers = null;
        AQjmsBytesMessage[] aQjmsBytesMessageArr = null;
        ProduceResponse.PartitionResponse partitionResponse = null;
        int i = 2;
        while (true) {
            z = false;
            i--;
            try {
                topicPublishers = this.topicPublishersMap.get(nodeById);
            } catch (Exception e) {
                this.log.error("Exception while sending records for topic partition " + topicpartition + " no node " + nodeById, e);
                if (e instanceof JMSException) {
                    this.log.info(" Encountered AQJMS Exception with error code " + e.getErrorNumber());
                    if (e.getErrorNumber() == 25348) {
                        this.log.debug("Causing NotLeaderForPartitionException ");
                        partitionResponse = createResponses(topicpartition, new NotLeaderForPartitionException(e), aQjmsBytesMessageArr);
                        break;
                    }
                }
                if (topicPublishers != null) {
                    if (topicPublishers.isConnected()) {
                        this.log.info("Connection to node is fine. Retrying to publish = " + (i > 0 ? "TRUE" : "FALSE"));
                    } else {
                        try {
                            topicPublishers.close();
                            if (i > 0) {
                                this.log.info("Reconnecting to node " + nodeById);
                                topicPublishers.reCreate();
                            } else {
                                z = true;
                                this.log.info("Disconnected. Failing the batch");
                            }
                        } catch (Exception e2) {
                            this.log.error("Exception while reconnecting to node " + nodeById, e2);
                            z = true;
                            i = 0;
                            try {
                                topicPublishers.close();
                            } catch (Exception e3) {
                            }
                            this.topicPublishersMap.remove(nodeById);
                            this.log.trace("Connection with node {} is closed", clientRequest.destination());
                            partitionResponse = createResponses(topicpartition, new DisconnectException("Database instance not reachable: " + nodeById, e), aQjmsBytesMessageArr);
                        }
                    }
                }
            }
            if (topicPublishers == null) {
                throw new NullPointerException("No publishers created for node " + nodeById);
                break;
            }
            TopicSession session = topicPublishers.getSession();
            ArrayList arrayList = new ArrayList();
            AbstractIterator batchIterator = memoryRecords.batchIterator();
            while (batchIterator.hasNext()) {
                for (Record record : (MutableRecordBatch) batchIterator.next()) {
                    arrayList.add(createBytesMessage(session, topicpartition, record.key(), record.value(), record.headers()));
                }
            }
            TopicPublisher topicPublisher = this.topicPublishersMap.get(nodeById).getTopicPublisher(topicpartition.topic());
            aQjmsBytesMessageArr = (AQjmsBytesMessage[]) arrayList.toArray(new AQjmsBytesMessage[0]);
            this.log.trace("sending messages to topic : {} with partition: {}, number of messages: {}", new Object[]{topicpartition.topic(), Integer.valueOf(topicpartition.partition()), Integer.valueOf(aQjmsBytesMessageArr.length)});
            sendToAQ(aQjmsBytesMessageArr, topicPublisher);
            this.log.trace("Messages sent successfully to topic : {} with partition: {}, number of messages: {}", new Object[]{topicpartition.topic(), Integer.valueOf(topicpartition.partition()), Integer.valueOf(aQjmsBytesMessageArr.length)});
            i = 0;
            if (i <= 0) {
                break;
            }
        }
        if (partitionResponse == null) {
            partitionResponse = createResponses(topicpartition, null, aQjmsBytesMessageArr);
        }
        return createClientResponse(clientRequest, topicpartition, partitionResponse, z);
    }

    private ClientResponse createClientResponse(ClientRequest clientRequest, TopicPartition topicPartition, ProduceResponse.PartitionResponse partitionResponse, boolean z) {
        return new ClientResponse(clientRequest.makeHeader((short) 1), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, (UnsupportedVersionException) null, (AuthenticationException) null, new ProduceResponse(topicPartition, partitionResponse));
    }

    private void sendToAQ(AQjmsBytesMessage[] aQjmsBytesMessageArr, TopicPublisher topicPublisher) throws JMSException {
        this.log.info("In BulkSend: #messages = " + aQjmsBytesMessageArr.length);
        ((AQjmsProducer) topicPublisher).bulkSend(topicPublisher.getTopic(), aQjmsBytesMessageArr);
    }

    private AQjmsBytesMessage createBytesMessage(TopicSession topicSession, TopicPartition topicPartition, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, Header[] headerArr, boolean z) throws JMSException {
        AQjmsBytesMessage createBytesMessage = topicSession.createBytesMessage();
        if (byteBuffer != null) {
            byte[] bArr = new byte[byteBuffer.limit()];
            byteBuffer.get(bArr);
            createBytesMessage.setJMSCorrelationID(new String(bArr));
        }
        byte[] bArr2 = new byte[byteBuffer2.limit()];
        byteBuffer2.get(bArr2);
        createBytesMessage.writeBytes(bArr2);
        createBytesMessage.setStringProperty("topic", topicPartition.topic());
        createBytesMessage.setStringProperty(AQClient.PARTITION_PROPERTY, Integer.toString(topicPartition.partition() * 2));
        return createBytesMessage;
    }

    private AQjmsBytesMessage createBytesMessage(TopicSession topicSession, TopicPartition topicPartition, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, Header[] headerArr) throws JMSException {
        int i = 0;
        int i2 = 0;
        int[] iArr = null;
        int[] iArr2 = null;
        byte[] bArr = null;
        byte[] bArr2 = null;
        if (headerArr != null) {
            iArr = new int[headerArr.length];
            iArr2 = new int[headerArr.length];
        }
        AQjmsBytesMessage createBytesMessage = topicSession.createBytesMessage();
        if (byteBuffer != null) {
            bArr = new byte[byteBuffer.limit()];
            byteBuffer.get(bArr);
            i = bArr.length;
        }
        int i3 = 0 + i + 4;
        if (byteBuffer2 != null) {
            bArr2 = new byte[byteBuffer2.limit()];
            byteBuffer2.get(bArr2);
            i2 = bArr2.length;
        }
        int i4 = i3 + i2 + 4;
        if (headerArr != null) {
            int i5 = 0;
            for (Header header : headerArr) {
                int length = header.key().getBytes().length;
                int i6 = i4 + length + 4;
                iArr[i5] = length;
                int length2 = header.value().length;
                i4 = i6 + length2 + 4;
                int i7 = i5;
                i5++;
                iArr2[i7] = length2;
            }
        }
        ByteBuffer allocate = ByteBuffer.allocate(i4);
        allocate.put(ConnectionUtils.convertTo4Byte(i));
        if (i > 0) {
            allocate.put(bArr);
            createBytesMessage.setJMSCorrelationID(new String(bArr));
        }
        allocate.put(ConnectionUtils.convertTo4Byte(i2));
        if (i2 > 0) {
            allocate.put(bArr2);
        }
        if (headerArr != null) {
            int i8 = 0;
            for (Header header2 : headerArr) {
                allocate.put(ConnectionUtils.convertTo4Byte(iArr[i8]));
                allocate.put(header2.key().getBytes());
                int i9 = i8;
                i8++;
                allocate.put(ConnectionUtils.convertTo4Byte(iArr2[i9]));
                allocate.put(header2.value());
            }
        }
        allocate.rewind();
        byte[] bArr3 = new byte[allocate.limit()];
        allocate.get(bArr3);
        createBytesMessage.writeBytes(bArr3);
        createBytesMessage.setStringProperty(AQClient.PARTITION_PROPERTY, Integer.toString(topicPartition.partition() * 2));
        if (headerArr != null) {
            createBytesMessage.setIntProperty(AQClient.HEADERCOUNT_PROPERTY, headerArr.length);
        }
        createBytesMessage.setBooleanProperty(AQClient.PARSEPAYLOAD_PROPERTY, true);
        return createBytesMessage;
    }

    private ProduceResponse.PartitionResponse createResponses(TopicPartition topicPartition, RuntimeException runtimeException, AQjmsBytesMessage[] aQjmsBytesMessageArr) {
        String str;
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(runtimeException);
        if (runtimeException == null) {
            partitionResponse.msgIds = new ArrayList();
            long j = -1;
            long j2 = -1;
            for (int i = 0; i < aQjmsBytesMessageArr.length; i++) {
                try {
                    str = aQjmsBytesMessageArr[i].getJMSMessageID();
                    j = aQjmsBytesMessageArr[i].getJMSTimestamp();
                    if (j2 == -1) {
                        j2 = MessageIdConverter.getOKafkaOffset(str).subPartitionId();
                    }
                } catch (Exception e) {
                    str = null;
                    j = -1;
                }
                partitionResponse.msgIds.add(str);
            }
            partitionResponse.logAppendTime = j;
            partitionResponse.subPartitionId = j2;
        }
        return partitionResponse;
    }

    private ClientResponse getMetadata(ClientRequest clientRequest) {
        Connection connection = null;
        Node node = null;
        if (this.metadata.isBootstrap()) {
            List<Node> convertToOracleNodes = NetworkClient.convertToOracleNodes(this.metadata.fetch().nodes());
            Set<Node> keySet = this.topicPublishersMap.keySet();
            for (Node node2 : convertToOracleNodes) {
                Iterator<Node> it = keySet.iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (it.next().equals(node2)) {
                            node = node2;
                            break;
                        }
                    }
                }
            }
            if (node == null) {
                node = convertToOracleNodes.get(0);
                this.log.info("No Connected Node Found. Picked first of bootstrap nodes.: " + node);
            }
        } else {
            node = this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()));
        }
        try {
            if (this.topicPublishersMap.get(node) != null) {
                connection = this.topicPublishersMap.get(node).getSession().getDBConnection();
            } else {
                for (TopicPublishers topicPublishers : this.topicPublishersMap.values()) {
                    if (topicPublishers.isConnected()) {
                        connection = topicPublishers.getSession().getDBConnection();
                    }
                }
                if (connection == null) {
                    this.log.info("Sender not connected to any node. Re-connecting.");
                    for (Node node3 : NetworkClient.convertToOracleNodes(this.metadata.fetch().nodes())) {
                        try {
                            connect(node3);
                            this.log.info("Attempting to connect to " + node3);
                            connection = this.topicPublishersMap.get(node3).getSession().getDBConnection();
                            this.log.info("Connected to node " + node3);
                            break;
                        } catch (Exception e) {
                            this.log.info(" Node {} not rechable", node3);
                        }
                    }
                }
            }
        } catch (JMSException e2) {
            try {
                this.log.trace("Unexcepted error occured with connection to node {}, closing the connection", clientRequest.destination());
                this.topicPublishersMap.get(this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()))).getConnection().close();
                this.log.trace("Connection with node {} is closed", clientRequest.destination());
            } catch (JMSException e3) {
                this.log.trace("Failed to close connection with node {}", clientRequest.destination());
            }
        }
        ClientResponse metadataNow = getMetadataNow(clientRequest, connection);
        if (metadataNow.wasDisconnected()) {
            this.topicPublishersMap.remove(this.metadata.getNodeById(Integer.parseInt(clientRequest.destination())));
        }
        return metadataNow;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close() {
        for (Map.Entry<Node, TopicPublishers> entry : this.topicPublishersMap.entrySet()) {
            close(entry.getKey(), entry.getValue());
        }
        this.topicPublishersMap.clear();
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close(Node node) {
    }

    private void close(Node node, TopicPublishers topicPublishers) {
        if (node == null || topicPublishers == null) {
            return;
        }
        for (Map.Entry<String, TopicPublisher> entry : topicPublishers.getTopicPublisherMap().entrySet()) {
            try {
                entry.getValue().close();
            } catch (JMSException e) {
                this.log.error("failed to close topic publisher for topic {} ", entry.getKey());
            }
        }
        try {
            topicPublishers.getSession().close();
        } catch (JMSException e2) {
            this.log.error("failed to close session {} associated with connection {} and node {}  ", new Object[]{topicPublishers.getSession(), topicPublishers.getConnection(), node});
        }
        try {
            topicPublishers.getConnection().close();
        } catch (JMSException e3) {
            this.log.error("failed to close connection {} associated with node {}  ", topicPublishers.getConnection(), node);
        }
    }
}
