package org.oracle.okafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import javax.jms.JMSException;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import oracle.jdbc.internal.OracleConnection;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsException;
import oracle.jms.AQjmsProducer;
import oracle.jms.AQjmsSession;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.oracle.okafka.clients.Metadata;
import org.oracle.okafka.clients.NetworkClient;
import org.oracle.okafka.clients.TopicTeqParameters;
import org.oracle.okafka.clients.producer.ProducerConfig;
import org.oracle.okafka.clients.producer.internals.OracleTransactionManager;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.network.AQClient;
import org.oracle.okafka.common.network.SelectorMetrics;
import org.oracle.okafka.common.protocol.ApiKeys;
import org.oracle.okafka.common.requests.MetadataResponse;
import org.oracle.okafka.common.requests.ProduceRequest;
import org.oracle.okafka.common.requests.ProduceResponse;
import org.oracle.okafka.common.utils.ConnectionUtils;
import org.oracle.okafka.common.utils.MessageIdConverter;

/* loaded from: input_file:org/oracle/okafka/clients/producer/internals/AQKafkaProducer.class */
public final class AQKafkaProducer extends AQClient {
    private final Map<Node, TopicPublishers> topicPublishersMap;
    private final ProducerConfig configs;
    private final Time time;
    private Metadata metadata;
    private final Metrics metrics;
    private final SelectorMetrics selectorMetrics;
    private final int DLENGTH_SIZE = 4;
    private boolean transactionalProducer;
    private boolean idempotentProducer;
    private int connectMode;
    private Connection dbConn;
    private Connection externalDbConn;
    private AQKafkaProducerStatus status;
    private OracleTransactionManager oTxm;
    static boolean forceRollback = false;
    static boolean forceRetry = false;
    static boolean forceDisconnect = false;
    static boolean stopReconnect = false;
    private HashMap<TopicPartition, MessageIdConverter.OKafkaOffset> currentOffsetMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/oracle/okafka/clients/producer/internals/AQKafkaProducer$AQKafkaProducerStatus.class */
    public enum AQKafkaProducerStatus {
        PRE_INIT,
        INIT,
        OPEN,
        CLOSE;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static AQKafkaProducerStatus[] valuesCustom() {
            AQKafkaProducerStatus[] valuesCustom = values();
            int length = valuesCustom.length;
            AQKafkaProducerStatus[] aQKafkaProducerStatusArr = new AQKafkaProducerStatus[length];
            System.arraycopy(valuesCustom, 0, aQKafkaProducerStatusArr, 0, length);
            return aQKafkaProducerStatusArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/oracle/okafka/clients/producer/internals/AQKafkaProducer$TopicPublishers.class */
    public final class TopicPublishers {
        private Connection externalConn;
        private Node node;
        private TopicConnection conn;
        private TopicSession sess;
        private Map<String, TopicPublisher> topicPublishers;
        private int sessionAckMode;
        private boolean isAlive;
        PreparedStatement pingStmt;
        private final String PING_QUERY = "SELECT banner FROM v$version where 1<>1";
        private String connInfo;

        public TopicPublishers(Node node, Connection connection) throws JMSException {
            this.topicPublishers = null;
            this.sessionAckMode = 1;
            this.isAlive = false;
            this.pingStmt = null;
            this.PING_QUERY = "SELECT banner FROM v$version where 1<>1";
            this.connInfo = "";
            this.node = node;
            this.externalConn = connection;
            this.sessionAckMode = 0;
            createPublishers(false);
            this.topicPublishers = new HashMap();
            AQKafkaProducer.this.log.debug("ExternalConnection " + connection);
        }

        public TopicPublishers(AQKafkaProducer aQKafkaProducer, Node node) throws JMSException {
            this(node, 1);
        }

        public TopicPublishers(Node node, int i) throws JMSException {
            this.topicPublishers = null;
            this.sessionAckMode = 1;
            this.isAlive = false;
            this.pingStmt = null;
            this.PING_QUERY = "SELECT banner FROM v$version where 1<>1";
            this.connInfo = "";
            this.node = node;
            this.sessionAckMode = i;
            try {
                createPublishers(false);
                this.topicPublishers = new HashMap();
            } catch (Exception e) {
                AQKafkaProducer.this.log.error("Exception while getting instance id from conneciton " + e, e);
                throw e;
            }
        }

        public String toString() {
            String str = String.valueOf(this.connInfo) + ". Acknowledge_mode:" + this.sessionAckMode + ".";
            if (this.topicPublishers != null && this.topicPublishers.size() > 0) {
                String str2 = "Topics:[";
                boolean z = true;
                for (String str3 : this.topicPublishers.keySet()) {
                    if (!z) {
                        str2 = String.valueOf(str2) + ",";
                    }
                    str2 = String.valueOf(str2) + str3;
                    z = false;
                }
                str = String.valueOf(str) + (String.valueOf(str2) + "].");
            }
            return str;
        }

        private boolean createPublishers(boolean z) throws JMSException {
            try {
                this.conn = createTopicConnection();
                this.sess = createTopicSession(this.sessionAckMode);
                OracleConnection dBConnection = this.sess.getDBConnection();
                int parseInt = Integer.parseInt(dBConnection.getServerSessionInfo().getProperty("AUTH_INSTANCE_NO"));
                String property = dBConnection.getServerSessionInfo().getProperty("SERVICE_NAME");
                String property2 = dBConnection.getServerSessionInfo().getProperty("INSTANCE_NAME");
                String userName = dBConnection.getMetaData().getUserName();
                this.connInfo = "Session_Info:" + dBConnection.getServerSessionInfo().getProperty("AUTH_SESSION_ID") + "," + dBConnection.getServerSessionInfo().getProperty("AUTH_SERIAL_NUM") + ". Process Id:" + dBConnection.getServerSessionInfo().getProperty("AUTH_SERVER_PID") + ". Instance Name:" + property2;
                AQKafkaProducer.this.log.info("Database Producer " + this.connInfo);
                if (z) {
                    this.node.setId(parseInt);
                    this.node.setService(property);
                    this.node.setInstanceName(property2);
                    this.node.setUser(userName);
                    this.node.updateHashCode();
                }
                this.pingStmt = dBConnection.prepareStatement("SELECT banner FROM v$version where 1<>1");
                this.pingStmt.setQueryTimeout(1);
                this.isAlive = true;
                return true;
            } catch (Exception e) {
                JMSException jMSException = new JMSException(e.getMessage());
                jMSException.setLinkedException(e);
                throw jMSException;
            }
        }

        public TopicConnection createTopicConnection() throws Exception {
            if (this.externalConn == null || this.externalConn.isClosed()) {
                this.conn = ConnectionUtils.createTopicConnection(this.node, AQKafkaProducer.this.configs, AQKafkaProducer.this.log);
            } else {
                AQKafkaProducer.this.log.debug("Using External Connection to setup TopicConnection");
                this.conn = ConnectionUtils.createTopicConnection(this.externalConn, AQKafkaProducer.this.configs, AQKafkaProducer.this.log);
            }
            return this.conn;
        }

        public TopicPublisher getTopicPublisher(String str) throws JMSException {
            TopicPublisher topicPublisher = this.topicPublishers.get(str);
            if (topicPublisher == null) {
                topicPublisher = createTopicPublisher(str);
                this.topicPublishers.put(str, topicPublisher);
            }
            return topicPublisher;
        }

        public TopicSession createTopicSession(int i) throws JMSException {
            if (this.sess != null && this.sess.isOpen()) {
                return this.sess;
            }
            boolean z = false;
            if (i == 0) {
                z = true;
                i = 1;
            }
            this.sess = ConnectionUtils.createTopicSession(this.conn, i, z);
            this.conn.start();
            return this.sess;
        }

        private TopicPublisher createTopicPublisher(String str) throws JMSException {
            return this.sess.createPublisher(this.sess.getTopic((this.node == null || this.node.user() == null) ? ConnectionUtils.getUsername(AQKafkaProducer.this.configs) : this.node.user(), str));
        }

        public TopicConnection getConnection() {
            return this.conn;
        }

        public TopicSession getSession() {
            return this.sess;
        }

        public Map<String, TopicPublisher> getTopicPublisherMap() {
            return this.topicPublishers;
        }

        public boolean isConnected() {
            if (this.isAlive) {
                try {
                    this.pingStmt.executeQuery();
                } catch (Exception e) {
                    AQKafkaProducer.this.log.error("Publishers to node {} Failed to connect.", this.node.toString());
                    this.isAlive = false;
                }
            }
            return this.isAlive;
        }

        public boolean reCreate() throws JMSException {
            AQKafkaProducer.this.log.debug("Recreating TopicPublisher " + toString());
            close();
            if (!createPublishers(true)) {
                AQKafkaProducer.this.log.debug("Recreation failed");
                return false;
            }
            AQKafkaProducer.this.log.debug("Successfully recreated " + toString());
            try {
                HashMap hashMap = new HashMap();
                for (String str : this.topicPublishers.keySet()) {
                    try {
                        hashMap.put(str, createTopicPublisher(str));
                    } catch (Exception e) {
                        AQKafkaProducer.this.log.error("Exception " + e + " while re-creating publishers for topic " + str + " for node" + this.node);
                    }
                }
                this.topicPublishers.clear();
                this.topicPublishers = hashMap;
            } catch (Exception e2) {
                AQKafkaProducer.this.log.error("Exception " + e2 + " while re-creating publishers for topic for node" + this.node);
            }
            this.isAlive = true;
            return this.isAlive;
        }

        public void close() {
            try {
                if (this.pingStmt != null) {
                    if (!this.pingStmt.isClosed()) {
                        this.pingStmt.close();
                    }
                    this.pingStmt = null;
                }
            } catch (Exception e) {
                AQKafkaProducer.this.log.error("Error while closing ping statement for " + this.node);
                this.pingStmt = null;
            }
            try {
                if (this.sess != null) {
                    if (this.sess.isOpen()) {
                        this.sess.close();
                    }
                    this.sess = null;
                }
            } catch (Exception e2) {
                AQKafkaProducer.this.log.error("Error while closing session for " + this.node);
                this.sess = null;
            }
            try {
                if (this.conn != null) {
                    if (this.conn.isOpen()) {
                        this.conn.close();
                    }
                    this.conn = null;
                }
            } catch (Exception e3) {
                AQKafkaProducer.this.log.error("Error while closing connection for " + this.node);
            }
            this.isAlive = false;
        }
    }

    public AQKafkaProducer(LogContext logContext, ProducerConfig producerConfig, Time time, Metadata metadata, Metrics metrics, OracleTransactionManager oracleTransactionManager) {
        super(logContext.logger(AQKafkaProducer.class), producerConfig);
        this.DLENGTH_SIZE = 4;
        this.transactionalProducer = false;
        this.idempotentProducer = false;
        this.connectMode = 1;
        this.dbConn = null;
        this.externalDbConn = null;
        this.status = AQKafkaProducerStatus.PRE_INIT;
        this.oTxm = null;
        this.currentOffsetMap = null;
        this.configs = producerConfig;
        this.time = time;
        this.topicPublishersMap = new HashMap();
        this.metadata = metadata;
        this.oTxm = oracleTransactionManager;
        this.metrics = metrics;
        this.selectorMetrics = new SelectorMetrics(this.metrics, "Selector", Collections.emptyMap(), true);
        this.selectorMetrics.recordConnectionCount(this.topicPublishersMap);
        try {
            this.transactionalProducer = producerConfig.getBoolean("oracle.transactional.producer").booleanValue();
        } catch (Exception e) {
            this.transactionalProducer = false;
        }
        try {
            this.idempotentProducer = producerConfig.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG).booleanValue();
            if (this.idempotentProducer) {
                this.connectMode = 0;
            }
        } catch (Exception e2) {
            this.idempotentProducer = false;
        }
        if (this.transactionalProducer) {
            this.connectMode = 0;
            this.currentOffsetMap = new HashMap<>();
        }
        this.status = AQKafkaProducerStatus.INIT;
    }

    public synchronized void setExternalDbConnection(Connection connection) {
        this.log.debug("Setting externally supplied db connection " + connection);
        if (this.oTxm.getTransactionState() == OracleTransactionManager.TransactionState.BEGIN) {
            throw new KafkaException("A transaction with another oracle connection already active.");
        }
        this.oTxm.setDBConnection(this.externalDbConn);
        this.externalDbConn = connection;
    }

    private void addToMap(Node node, TopicPublishers topicPublishers) throws JMSException {
        this.topicPublishersMap.put(node, topicPublishers);
        this.log.info("Connected nodes: " + this.topicPublishersMap.keySet());
        this.status = AQKafkaProducerStatus.OPEN;
        this.selectorMetrics.maybeRegisterConnectionMetrics(node);
        this.selectorMetrics.connectionCreated.record();
        this.log.debug("CONNECTED NODES: " + this.topicPublishersMap.keySet());
    }

    private void connect(Node node, Connection connection) throws JMSException {
        TopicPublishers topicPublishers = null;
        try {
            this.log.debug("Creating new Topic connection for node " + node);
            topicPublishers = new TopicPublishers(node, connection);
            addToMap(node, topicPublishers);
        } catch (JMSException e) {
            close(node, topicPublishers);
            throw e;
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void connect(Node node) throws JMSException {
        TopicPublishers topicPublishers = null;
        try {
            this.log.debug("Creating new connection for node " + node);
            topicPublishers = new TopicPublishers(node, this.connectMode);
            addToMap(node, topicPublishers);
        } catch (JMSException e) {
            close(node, topicPublishers);
            throw e;
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public boolean isChannelReady(Node node) {
        return this.topicPublishersMap.containsKey(node);
    }

    public Connection getDBConnection(boolean z) throws JMSException {
        if (this.externalDbConn != null) {
            this.log.debug("Returning externally supplied db connection. " + this.externalDbConn);
            return this.externalDbConn;
        }
        if (this.dbConn != null) {
            this.log.debug("Returning already created db connection. " + this.dbConn);
            return this.dbConn;
        }
        if (!z) {
            this.log.debug(" Database connection not established yet. Not forced to create one. Returning null.");
            return null;
        }
        try {
            if (this.topicPublishersMap == null || this.topicPublishersMap.isEmpty()) {
                org.apache.kafka.common.Node node = (org.apache.kafka.common.Node) this.metadata.fetch().nodes().get(0);
                this.log.debug("Transactional producer trying to connect to BootstrapNode. " + node);
                connect((Node) node);
            }
            Cluster fetch = this.metadata.fetch();
            this.log.debug("Leader node is " + this.metadata.getLeader());
            Node node2 = (Node) fetch.controller();
            this.log.debug("Controller Node " + node2);
            if (node2 == null) {
                node2 = this.metadata.fetch().isBootstrapConfigured() ? (Node) this.metadata.fetch().controller() : (Node) this.metadata.fetch().nodes().get(0);
                this.metadata.setLeader(node2);
                this.log.debug("getDBConnection: Controller Node and LeaderNode set to " + node2);
            }
            this.dbConn = this.topicPublishersMap.get(node2).sess.getDBConnection();
            return this.dbConn;
        } catch (Exception e) {
            this.log.info("Faiiled to create database connection for transactional producer. Exception: " + e);
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v26, types: [org.apache.kafka.common.Node] */
    /* JADX WARN: Type inference failed for: r0v38, types: [org.apache.kafka.common.Node] */
    /* JADX WARN: Type inference failed for: r0v48, types: [org.apache.kafka.common.Node] */
    public Future<RecordMetadata> transactionalSend(TopicPartition topicPartition, byte[] bArr, byte[] bArr2, Header[] headerArr, Callback callback) {
        FutureRecordMetadata futureRecordMetadata;
        MessageIdConverter.OKafkaOffset oKafkaOffset;
        RuntimeException runtimeException = null;
        this.log.debug("Message for TopicPartition " + topicPartition);
        try {
            if (this.topicPublishersMap == null || this.topicPublishersMap.isEmpty()) {
                Node node = null;
                if (this.externalDbConn != null) {
                    this.log.debug("");
                    int instanceId = ConnectionUtils.getInstanceId(this.externalDbConn);
                    node = this.metadata.fetch().nodeById(instanceId);
                    if (node == null) {
                        node = (org.apache.kafka.common.Node) this.metadata.fetch().nodes().get(0);
                        node.setId(instanceId);
                        this.log.debug("External DB + BOotStrap Node " + node);
                        this.log.warn("Connection setup to instance " + instanceId + ". Which is not found in current cluster");
                    }
                    this.log.debug("Created publisher using externally supplied database connection to instance " + instanceId);
                    connect(node, this.externalDbConn);
                }
                if (node == null) {
                    node = (org.apache.kafka.common.Node) this.metadata.fetch().nodes().get(0);
                    this.log.debug("Creating using bootstrapnode " + node);
                    connect(node);
                }
                this.metadata.setLeader(node);
            }
            Node node2 = (Node) this.metadata.fetch().controller();
            this.log.debug("Controller " + node2);
            Node leader = this.metadata.getLeader();
            this.log.debug("Leader Node " + leader);
            TopicPublishers topicPublishers = leader != null ? this.topicPublishersMap.get(leader) : this.topicPublishersMap.get(node2);
            this.log.debug("Available Topic Publishers " + topicPublishers);
            if (topicPublishers == null && this.topicPublishersMap.size() > 0) {
                Iterator<Map.Entry<Node, TopicPublishers>> it = this.topicPublishersMap.entrySet().iterator();
                if (it.hasNext()) {
                    Map.Entry<Node, TopicPublishers> next = it.next();
                    this.metadata.setLeader(next.getKey());
                    topicPublishers = next.getValue();
                }
            }
            TopicPublisher topicPublisher = topicPublishers.getTopicPublisher(topicPartition.topic());
            Connection dBConnection = topicPublishers.getSession().getDBConnection();
            this.oTxm.setDBConnection(dBConnection);
            TopicTeqParameters topicTeqParameters = this.metadata.topicParaMap.get(topicPartition.topic());
            if (topicTeqParameters == null) {
                try {
                    super.fetchQueueParameters(topicPartition.topic(), dBConnection, this.metadata.topicParaMap);
                } catch (SQLException e) {
                    this.log.error("Exception while fetching TEQ parameters and updating metadata " + e.getMessage());
                }
            }
            AQjmsBytesMessage createBytesMessage = createBytesMessage(topicPublishers.sess, topicPartition, ByteBuffer.wrap(bArr), ByteBuffer.wrap(bArr2), headerArr, topicTeqParameters.getMsgVersion());
            try {
                topicPublisher.publish(createBytesMessage, 2, 0, -1L);
            } catch (JMSException e2) {
                this.log.error("Exception while producing transactionl message " + e2.getMessage());
                runtimeException = new RuntimeException((Throwable) e2);
            }
            if (runtimeException == null) {
                oKafkaOffset = MessageIdConverter.computeOffset(this.currentOffsetMap.get(topicPartition), createBytesMessage.getJMSMessageID());
                this.currentOffsetMap.remove(topicPartition);
                this.currentOffsetMap.put(topicPartition, oKafkaOffset);
            } else {
                oKafkaOffset = MessageIdConverter.getOKafkaOffset("", false, false);
            }
            ProduceRequestResult produceRequestResult = new ProduceRequestResult(topicPartition);
            produceRequestResult.set(oKafkaOffset.subPartitionId(), runtimeException == null ? createBytesMessage.getJMSTimestamp() : -1L, Collections.singletonList(oKafkaOffset), runtimeException);
            futureRecordMetadata = new FutureRecordMetadata(produceRequestResult, 0L, System.currentTimeMillis(), -1L, bArr.length, bArr2.length, this.time);
            produceRequestResult.done();
            this.oTxm.addRecordToTransaction(futureRecordMetadata);
        } catch (Exception e3) {
            this.log.error("Error while publishing records within a transaction." + e3.getMessage(), e3);
            ProduceRequestResult produceRequestResult2 = new ProduceRequestResult(topicPartition);
            produceRequestResult2.set(-1L, -1L, null, new RuntimeException(e3));
            futureRecordMetadata = new FutureRecordMetadata(produceRequestResult2, -1L, System.currentTimeMillis(), -1L, bArr.length, bArr2.length, this.time);
            produceRequestResult2.done();
        }
        return futureRecordMetadata;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public ClientResponse send(ClientRequest clientRequest) {
        ClientResponse parseRequest = parseRequest(clientRequest, ApiKeys.convertToOracleApiKey(clientRequest.apiKey()));
        this.selectorMetrics.recordCompletedReceive(parseRequest.destination(), parseRequest.requestLatencyMs());
        return parseRequest;
    }

    private ClientResponse parseRequest(ClientRequest clientRequest, ApiKeys apiKeys) {
        if (apiKeys == ApiKeys.PRODUCE) {
            return publish(clientRequest);
        }
        if (apiKeys == ApiKeys.METADATA) {
            return getMetadata(clientRequest);
        }
        return null;
    }

    private ClientResponse publish(ClientRequest clientRequest) {
        ProduceRequest m21build = ((ProduceRequest.Builder) clientRequest.requestBuilder()).m21build();
        Node nodeById = this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()));
        TopicPartition topicpartition = m21build.getTopicpartition();
        MemoryRecords memoryRecords = m21build.getMemoryRecords();
        TopicPublishers topicPublishers = null;
        AQjmsBytesMessage[] aQjmsBytesMessageArr = null;
        ProduceResponse.PartitionResponse partitionResponse = null;
        int i = 2;
        TopicTeqParameters topicTeqParameters = this.metadata.topicParaMap.get(topicpartition.topic());
        long sizeInBytes = memoryRecords.sizeInBytes();
        int msgVersion = topicTeqParameters.getMsgVersion();
        this.log.debug("Publish request for node " + nodeById);
        try {
            if (topicTeqParameters.getKeyBased() != 2) {
                throw new InvalidTopicException("Topic " + topicpartition.topic() + " is not an Oracle kafka topic, Please drop and re-create topic using Admin.createTopics() or dbms_aqadm.create_database_kafka_topic procedure");
            }
            while (true) {
                boolean z = false;
                boolean z2 = false;
                boolean z3 = false;
                AQjmsException aQjmsException = null;
                i--;
                try {
                    topicPublishers = this.topicPublishersMap.get(nodeById);
                } catch (Exception e) {
                    aQjmsException = e;
                    if (0 == 0) {
                        this.log.error("Exception while sending records for topic partition " + topicpartition + " no node " + nodeById, e);
                    } else {
                        this.log.error("Exception while committing records for topic partition " + topicpartition + " no node " + nodeById, e);
                    }
                    if (e instanceof JMSException) {
                        this.log.info(" Encountered JMS Exception:" + e.getMessage());
                        if ((e instanceof AQjmsException) && e.getErrorNumber() == 25348) {
                            z3 = true;
                            if (aQjmsException != null) {
                                if (z3) {
                                    this.log.info("Node " + nodeById + " is not a Leader for partition " + topicpartition);
                                    partitionResponse = createResponses(topicpartition, new NotLeaderForPartitionException(aQjmsException), aQjmsBytesMessageArr);
                                    this.metadata.requestUpdate();
                                }
                                if (z) {
                                    this.topicPublishersMap.remove(nodeById);
                                    this.log.trace("Connection with node {} is closed", clientRequest.destination());
                                    partitionResponse = createResponses(topicpartition, new DisconnectException("Database instance not reachable: " + nodeById, aQjmsException), aQjmsBytesMessageArr);
                                }
                            } else {
                                partitionResponse = createResponses(topicpartition, null, aQjmsBytesMessageArr);
                            }
                            partitionResponse.setCheckDuplicate(z2);
                            return createClientResponse(clientRequest, topicpartition, partitionResponse, z);
                        }
                    }
                    if (topicPublishers != null) {
                        boolean isConnected = topicPublishers.isConnected();
                        this.log.info("KafkaProducer is connected to the broker? " + isConnected);
                        if (!isConnected) {
                            try {
                                topicPublishers.close();
                                if (stopReconnect || i <= 0) {
                                    z = true;
                                    this.log.info("Failed to reconnect to  " + nodeById + " . Failing this batch for " + topicpartition);
                                } else {
                                    this.log.info("Reconnecting to node " + nodeById);
                                    if (!topicPublishers.reCreate()) {
                                        this.log.info("Failed to reconnect to  " + nodeById + " . Failing this batch for " + topicpartition);
                                        z = true;
                                    }
                                }
                                stopReconnect = false;
                            } catch (Exception e2) {
                                this.log.error("Exception while reconnecting to node " + nodeById, e2);
                                z = true;
                                i = 0;
                                try {
                                    topicPublishers.close();
                                } catch (Exception e3) {
                                }
                            }
                        }
                        if (0 != 0) {
                            if (z) {
                                this.log.info("Node " + nodeById + " is not reachable. Batch will be reprocessed after checking for duplicates.");
                                i = 0;
                            } else {
                                try {
                                    this.log.debug("Connection to node is fine. Checking if previous publish was successfull or not.");
                                    topicPublishers = this.topicPublishersMap.get(nodeById);
                                    Connection dBConnection = topicPublishers.sess.getDBConnection();
                                    String substring = aQjmsBytesMessageArr[0].getJMSMessageID().substring(3);
                                    z2 = false;
                                    if (checkIfMsgIdExist(dBConnection, topicpartition.topic(), substring)) {
                                        this.log.debug("Message Id " + substring + " already present in for " + topicpartition + ". No need to retry.");
                                        i = 0;
                                        aQjmsException = null;
                                    }
                                } catch (Exception e4) {
                                    this.log.info("Exception while checking if message id exists or not " + e4);
                                    this.log.info("Batch will be processed again after checking for duplicates.");
                                    z2 = true;
                                    i = 0;
                                }
                            }
                        }
                    }
                }
                if (topicPublishers == null) {
                    throw new NullPointerException("No publishers created for node " + nodeById);
                }
                this.log.debug("Found a publisher " + topicPublishers + " for node " + nodeById);
                AQjmsSession session = topicPublishers.getSession();
                if (this.idempotentProducer) {
                    String str = null;
                    try {
                        if (m21build.checkForDups()) {
                            Connection dBConnection2 = session.getDBConnection();
                            List<MessageIdConverter.OKafkaOffset> retryMsgList = m21build.retryMsgList();
                            if (retryMsgList != null && retryMsgList.size() > 0) {
                                str = retryMsgList.get(0).getMsgId().substring(3);
                                this.log.debug("Duplicate Check for parition " + topicpartition + "for msgId  " + str);
                            }
                            if (checkIfMsgIdExist(dBConnection2, topicpartition.topic(), str)) {
                                this.log.info("Message Id " + str + " exists for topic partition " + topicpartition + ". Records were succesfully produced.");
                                partitionResponse = createResponses(topicpartition, null, null);
                                partitionResponse.setCheckDuplicate(false);
                                partitionResponse.setOffsets(retryMsgList);
                                return createClientResponse(clientRequest, topicpartition, partitionResponse, false);
                            }
                            this.log.info("Message Id " + str + " exists for topic partition " + topicpartition + " does not exist. Retrying to publish");
                        }
                    } catch (Exception e5) {
                        this.log.error("Exception while checking for duplicates for topic partition " + topicpartition + " message id " + str + " Exception : " + e5, e5);
                        m21build.checkForDups();
                        throw e5;
                    }
                }
                ArrayList arrayList = new ArrayList();
                AbstractIterator batchIterator = memoryRecords.batchIterator();
                while (batchIterator.hasNext()) {
                    for (Record record : (MutableRecordBatch) batchIterator.next()) {
                        arrayList.add(createBytesMessage(session, topicpartition, record.key(), record.value(), record.headers(), msgVersion));
                    }
                }
                TopicPublisher topicPublisher = topicPublishers.getTopicPublisher(topicpartition.topic());
                aQjmsBytesMessageArr = (AQjmsBytesMessage[]) arrayList.toArray(new AQjmsBytesMessage[0]);
                this.log.trace("sending messages to topic : {} with partition: {}, number of messages: {}", new Object[]{topicpartition.topic(), Integer.valueOf(topicpartition.partition()), Integer.valueOf(aQjmsBytesMessageArr.length)});
                sendToAQ(aQjmsBytesMessageArr, topicPublisher);
                if (this.idempotentProducer) {
                    try {
                        this.log.trace("Idempotent Producer. Committing with node " + nodeById);
                        if (forceRollback) {
                            topicPublishers.sess.rollback();
                            forceRetry = true;
                            forceRollback = false;
                        } else {
                            topicPublishers.sess.commit();
                        }
                        if (forceDisconnect) {
                            topicPublishers.sess.close();
                            forceRetry = true;
                            forceDisconnect = false;
                        }
                        if (forceRetry) {
                            forceRetry = false;
                            throw new KafkaException("Dummy Exception");
                            break;
                        }
                    } catch (Exception e6) {
                        this.log.error("Exception while committing records " + e6.getMessage());
                        throw e6;
                    }
                }
                this.selectorMetrics.recordCompletedSend(clientRequest.destination(), sizeInBytes, System.currentTimeMillis());
                this.log.trace("Messages sent successfully to topic : {} with partition: {}, number of messages: {}", new Object[]{topicpartition.topic(), Integer.valueOf(topicpartition.partition()), Integer.valueOf(aQjmsBytesMessageArr.length)});
                i = 0;
                if (i <= 0) {
                    break;
                }
            }
        } catch (InvalidTopicException e7) {
            this.log.error("Cannot send messages to topic " + topicpartition.topic() + ". Not a kafka topic");
            return createClientResponse(clientRequest, topicpartition, createResponses(topicpartition, e7, null), false);
        }
    }

    private void dumpTopicPublishers() {
        if (this.topicPublishersMap == null) {
            this.log.info("TopicPublisherMap is null");
        } else {
            this.log.info("TopicPublisherMap size " + this.topicPublishersMap.size());
        }
        for (Node node : this.topicPublishersMap.keySet()) {
            this.log.info("Publihsers for Node " + node);
            this.log.info(this.topicPublishersMap.get(node).toString());
        }
    }

    private boolean checkIfMsgIdExist(Connection connection, String str, String str2) {
        CallableStatement prepareCall;
        boolean z = false;
        String str3 = " Select count(*) from " + ConnectionUtils.enquote(str) + " where msgid = '" + str2 + "'";
        this.log.debug("Executing " + str3);
        ResultSet resultSet = null;
        Throwable th = null;
        try {
            try {
                prepareCall = connection.prepareCall(str3);
            } catch (Exception e) {
                this.log.info("Exception while checking if msgId Exists or not. " + e, e);
                if (0 != 0) {
                    try {
                        resultSet.close();
                    } catch (Exception e2) {
                    }
                }
            }
            try {
                prepareCall.execute(str3);
                ResultSet resultSet2 = prepareCall.getResultSet();
                z = resultSet2.next() ? resultSet2.getInt(1) != 0 : false;
                resultSet2.close();
                if (prepareCall != null) {
                    prepareCall.close();
                }
                this.log.debug("Message Id " + str2 + " Exists?: " + z);
                return z;
            } catch (Throwable th2) {
                if (prepareCall != null) {
                    prepareCall.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    private ClientResponse createClientResponse(ClientRequest clientRequest, TopicPartition topicPartition, ProduceResponse.PartitionResponse partitionResponse, boolean z) {
        return new ClientResponse(clientRequest.makeHeader((short) 1), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, (UnsupportedVersionException) null, (AuthenticationException) null, new ProduceResponse(topicPartition, partitionResponse));
    }

    private void sendToAQ(AQjmsBytesMessage[] aQjmsBytesMessageArr, TopicPublisher topicPublisher) throws JMSException {
        this.log.info("In BulkSend: #messages = " + aQjmsBytesMessageArr.length);
        ((AQjmsProducer) topicPublisher).bulkSend(topicPublisher.getTopic(), aQjmsBytesMessageArr);
    }

    private AQjmsBytesMessage createBytesMessage(TopicSession topicSession, TopicPartition topicPartition, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, Header[] headerArr, int i) throws JMSException {
        return i == 2 ? createBytesMessageV2(topicSession, topicPartition, byteBuffer, byteBuffer2, headerArr) : createBytesMessageV1(topicSession, topicPartition, byteBuffer, byteBuffer2, headerArr);
    }

    private AQjmsBytesMessage createBytesMessageV1(TopicSession topicSession, TopicPartition topicPartition, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, Header[] headerArr) throws JMSException {
        AQjmsBytesMessage createBytesMessage = topicSession.createBytesMessage();
        if (byteBuffer != null) {
            byte[] bArr = new byte[byteBuffer.limit()];
            byteBuffer.get(bArr);
            createBytesMessage.setJMSCorrelationID(new String(bArr));
        }
        byte[] bArr2 = new byte[byteBuffer2.limit()];
        byteBuffer2.get(bArr2);
        createBytesMessage.writeBytes(bArr2);
        createBytesMessage.setStringProperty("topic", topicPartition.topic());
        createBytesMessage.setStringProperty(AQClient.PARTITION_PROPERTY, Integer.toString(topicPartition.partition() * 2));
        createBytesMessage.setIntProperty(AQClient.MESSAGE_VERSION, 1);
        return createBytesMessage;
    }

    private AQjmsBytesMessage createBytesMessageV2(TopicSession topicSession, TopicPartition topicPartition, ByteBuffer byteBuffer, ByteBuffer byteBuffer2, Header[] headerArr) throws JMSException {
        int i = 0;
        int i2 = 0;
        int[] iArr = null;
        int[] iArr2 = null;
        byte[] bArr = null;
        byte[] bArr2 = null;
        if (headerArr != null) {
            iArr = new int[headerArr.length];
            iArr2 = new int[headerArr.length];
        }
        AQjmsBytesMessage createBytesMessage = topicSession.createBytesMessage();
        if (byteBuffer != null) {
            bArr = new byte[byteBuffer.limit()];
            byteBuffer.get(bArr);
            i = bArr.length;
        }
        int i3 = 0 + i + 4;
        if (byteBuffer2 != null) {
            bArr2 = new byte[byteBuffer2.limit()];
            byteBuffer2.get(bArr2);
            i2 = bArr2.length;
        }
        int i4 = i3 + i2 + 4;
        if (headerArr != null) {
            int i5 = 0;
            for (Header header : headerArr) {
                int length = header.key().getBytes().length;
                int i6 = i4 + length + 4;
                iArr[i5] = length;
                int length2 = header.value().length;
                i4 = i6 + length2 + 4;
                int i7 = i5;
                i5++;
                iArr2[i7] = length2;
            }
        }
        ByteBuffer allocate = ByteBuffer.allocate(i4);
        allocate.put(ConnectionUtils.convertTo4Byte(i));
        if (i > 0) {
            allocate.put(bArr);
            createBytesMessage.setJMSCorrelationID(new String(bArr));
        }
        allocate.put(ConnectionUtils.convertTo4Byte(i2));
        if (i2 > 0) {
            allocate.put(bArr2);
        }
        if (headerArr != null) {
            int i8 = 0;
            for (Header header2 : headerArr) {
                allocate.put(ConnectionUtils.convertTo4Byte(iArr[i8]));
                allocate.put(header2.key().getBytes());
                int i9 = i8;
                i8++;
                allocate.put(ConnectionUtils.convertTo4Byte(iArr2[i9]));
                allocate.put(header2.value());
            }
        }
        allocate.rewind();
        byte[] bArr3 = new byte[allocate.limit()];
        allocate.get(bArr3);
        createBytesMessage.writeBytes(bArr3);
        createBytesMessage.setStringProperty(AQClient.PARTITION_PROPERTY, Integer.toString(topicPartition.partition() * 2));
        if (headerArr != null) {
            createBytesMessage.setIntProperty(AQClient.HEADERCOUNT_PROPERTY, headerArr.length);
        }
        createBytesMessage.setIntProperty(AQClient.MESSAGE_VERSION, 2);
        return createBytesMessage;
    }

    private ProduceResponse.PartitionResponse createResponses(TopicPartition topicPartition, RuntimeException runtimeException, AQjmsBytesMessage[] aQjmsBytesMessageArr) {
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(runtimeException);
        if (aQjmsBytesMessageArr != null) {
            partitionResponse.msgIds = new ArrayList();
            long j = -1;
            MessageIdConverter.OKafkaOffset oKafkaOffset = null;
            for (int i = 0; i < aQjmsBytesMessageArr.length; i++) {
                try {
                    String jMSMessageID = aQjmsBytesMessageArr[i].getJMSMessageID();
                    j = aQjmsBytesMessageArr[i].getJMSTimestamp();
                    oKafkaOffset = MessageIdConverter.computeOffset(oKafkaOffset, jMSMessageID);
                } catch (Exception e) {
                    j = -1;
                }
                partitionResponse.msgIds.add(oKafkaOffset);
            }
            partitionResponse.logAppendTime = j;
            partitionResponse.subPartitionId = -1L;
        }
        return partitionResponse;
    }

    private ClientResponse getMetadata(ClientRequest clientRequest) {
        Connection connection = null;
        Node node = null;
        if (this.metadata.isBootstrap()) {
            List<Node> convertToOracleNodes = NetworkClient.convertToOracleNodes(this.metadata.fetch().nodes());
            Set<Node> keySet = this.topicPublishersMap.keySet();
            for (Node node2 : convertToOracleNodes) {
                Iterator<Node> it = keySet.iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (it.next().equals(node2)) {
                            node = node2;
                            break;
                        }
                    }
                }
            }
            if (node == null) {
                node = convertToOracleNodes.get(0);
                this.log.info("No Connected Node Found. Picked first of bootstrap nodes.: " + node);
            }
        } else {
            node = this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()));
        }
        try {
            if (this.topicPublishersMap.get(node) != null) {
                connection = this.topicPublishersMap.get(node).getSession().getDBConnection();
            } else {
                for (TopicPublishers topicPublishers : this.topicPublishersMap.values()) {
                    if (topicPublishers.isConnected()) {
                        connection = topicPublishers.getSession().getDBConnection();
                    }
                }
                if (connection == null) {
                    this.log.info("Sender not connected to any node. Re-connecting.");
                    for (Node node3 : NetworkClient.convertToOracleNodes(this.metadata.fetch().nodes())) {
                        try {
                            connect(node3);
                            this.log.info("Attempting to connect to " + node3);
                            connection = this.topicPublishersMap.get(node3).getSession().getDBConnection();
                            this.log.info("Connected to node " + node3);
                            node = node3;
                            break;
                        } catch (Exception e) {
                            this.log.info(" Node {} not rechable", node3);
                        }
                    }
                }
            }
        } catch (JMSException e2) {
            try {
                this.log.trace("Unexcepted error occured with connection to node {}, closing the connection", clientRequest.destination());
                this.topicPublishersMap.get(this.metadata.getNodeById(Integer.parseInt(clientRequest.destination()))).getConnection().close();
                this.log.trace("Connection with node {} is closed", clientRequest.destination());
            } catch (JMSException e3) {
                this.log.trace("Failed to close connection with node {}", clientRequest.destination());
            }
        }
        ClientResponse metadataNow = getMetadataNow(clientRequest, connection, node, this.metadata.updateRequested());
        Iterator it2 = ((MetadataResponse) metadataNow.responseBody()).cluster().topics().iterator();
        while (it2.hasNext()) {
            try {
                super.fetchQueueParameters((String) it2.next(), connection, this.metadata.topicParaMap);
            } catch (SQLException e4) {
                this.log.error("Exception while fetching TEQ parameters and updating metadata " + e4.getMessage());
            }
        }
        if (metadataNow.wasDisconnected()) {
            this.topicPublishersMap.remove(this.metadata.getNodeById(Integer.parseInt(clientRequest.destination())));
            this.metadata.requestUpdate();
        }
        return metadataNow;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close() {
        for (Map.Entry<Node, TopicPublishers> entry : this.topicPublishersMap.entrySet()) {
            close(entry.getKey(), entry.getValue());
        }
        this.topicPublishersMap.clear();
        this.status = AQKafkaProducerStatus.CLOSE;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close(Node node) {
        close(node, this.topicPublishersMap.get(node));
    }

    public boolean isClosed() {
        return this.status == AQKafkaProducerStatus.CLOSE;
    }

    private void close(Node node, TopicPublishers topicPublishers) {
        if (node == null || topicPublishers == null) {
            return;
        }
        for (Map.Entry<String, TopicPublisher> entry : topicPublishers.getTopicPublisherMap().entrySet()) {
            try {
                entry.getValue().close();
            } catch (JMSException e) {
                this.log.error("failed to close topic publisher for topic {} ", entry.getKey());
            }
        }
        try {
            topicPublishers.getSession().close();
        } catch (JMSException e2) {
            this.log.error("failed to close session {} associated with connection {} and node {}  ", new Object[]{topicPublishers.getSession(), topicPublishers.getConnection(), node});
        }
        try {
            topicPublishers.getConnection().close();
            this.selectorMetrics.connectionClosed.record();
        } catch (JMSException e3) {
            this.log.error("failed to close connection {} associated with node {}  ", topicPublishers.getConnection(), node);
        }
    }
}
