package org.oracle.okafka.clients.consumer.internals;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import oracle.jms.AQjmsBytesMessage;
import org.oracle.okafka.clients.ClientRequest;
import org.oracle.okafka.clients.ClientResponse;
import org.oracle.okafka.clients.consumer.ConsumerConfig;
import org.oracle.okafka.clients.consumer.OffsetAndMetadata;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.TopicPartition;
import org.oracle.okafka.common.network.AQClient;
import org.oracle.okafka.common.protocol.ApiKeys;
import org.oracle.okafka.common.requests.CommitRequest;
import org.oracle.okafka.common.requests.CommitResponse;
import org.oracle.okafka.common.requests.FetchRequest;
import org.oracle.okafka.common.requests.FetchResponse;
import org.oracle.okafka.common.requests.OffsetResetRequest;
import org.oracle.okafka.common.requests.OffsetResetResponse;
import org.oracle.okafka.common.requests.SubscribeRequest;
import org.oracle.okafka.common.requests.SubscribeResponse;
import org.oracle.okafka.common.requests.UnsubscribeResponse;
import org.oracle.okafka.common.utils.ConnectionUtils;
import org.oracle.okafka.common.utils.LogContext;
import org.oracle.okafka.common.utils.MessageIdConverter;
import org.oracle.okafka.common.utils.Time;
import org.oracle.okafka.common.utils.Utils;

/* loaded from: input_file:org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.class */
public final class AQKafkaConsumer extends AQClient {
    private final Map<Node, TopicConsumers> topicConsumersMap;
    private final ConsumerConfig configs;
    private final Time time;
    private String msgIdFormat;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer$SeekInput.class */
    public class SeekInput {
        static final int SEEK_BEGIN = 1;
        static final int SEEK_END = 2;
        static final int SEEK_MSGID = 3;
        static final int NO_DISCARD_SKIPPED = 1;
        static final int DISCARD_SKIPPED = 2;
        int partition;
        int priority = 1;
        int seekType;
        String seekMsgId;

        public SeekInput() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer$TopicConsumers.class */
    public final class TopicConsumers {
        private TopicConnection conn;
        private TopicSession sess;
        private Map<String, TopicSubscriber> topicSubscribers;
        private final Node node;

        public TopicConsumers(AQKafkaConsumer aQKafkaConsumer, Node node) throws JMSException {
            this(node, 1);
        }

        public TopicConsumers(Node node, int i) throws JMSException {
            this.conn = null;
            this.sess = null;
            this.topicSubscribers = null;
            this.node = node;
            this.conn = createTopicConnection(node);
            this.sess = createTopicSession(i);
            this.topicSubscribers = new HashMap();
        }

        public TopicConnection createTopicConnection(Node node) throws JMSException {
            if (this.conn == null) {
                this.conn = ConnectionUtils.createTopicConnection(node, AQKafkaConsumer.this.configs);
            }
            return this.conn;
        }

        public TopicSubscriber getTopicSubscriber(String str) throws JMSException {
            TopicSubscriber topicSubscriber = this.topicSubscribers.get(str);
            if (topicSubscriber == null) {
                topicSubscriber = createTopicSubscriber(str);
            }
            return topicSubscriber;
        }

        public TopicSession createTopicSession(int i) throws JMSException {
            if (this.sess != null) {
                return this.sess;
            }
            this.sess = ConnectionUtils.createTopicSession(this.conn, i, true);
            this.conn.start();
            return this.sess;
        }

        private TopicSubscriber createTopicSubscriber(String str) throws JMSException {
            refresh(this.node);
            TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(this.sess.getTopic(ConnectionUtils.getUsername(AQKafkaConsumer.this.configs), str), AQKafkaConsumer.this.configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
            this.topicSubscribers.put(str, createDurableSubscriber);
            return createDurableSubscriber;
        }

        private void refresh(Node node) throws JMSException {
            this.conn = createTopicConnection(node);
            this.sess = createTopicSession(1);
        }

        public TopicConnection getConnection() {
            return this.conn;
        }

        public TopicSession getSession() {
            return this.sess;
        }

        public Map<String, TopicSubscriber> getTopicSubscriberMap() {
            return this.topicSubscribers;
        }

        public void setSession(TopicSession topicSession) {
            this.sess = topicSession;
        }

        public void setConnection(TopicConnection topicConnection) {
            this.conn = topicConnection;
        }

        public void remove(String str) {
            this.topicSubscribers.remove(str);
        }
    }

    public AQKafkaConsumer(LogContext logContext, ConsumerConfig consumerConfig, Time time) {
        super(logContext.logger(AQKafkaConsumer.class), consumerConfig);
        this.msgIdFormat = "00";
        this.configs = consumerConfig;
        this.topicConsumersMap = new HashMap();
        this.time = time;
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public ClientResponse send(ClientRequest clientRequest) {
        return parseRequest(clientRequest, clientRequest.apiKey());
    }

    private ClientResponse parseRequest(ClientRequest clientRequest, ApiKeys apiKeys) {
        if (apiKeys == ApiKeys.FETCH) {
            return receive(clientRequest);
        }
        if (apiKeys == ApiKeys.COMMIT) {
            return commit(clientRequest);
        }
        if (apiKeys == ApiKeys.SUBSCRIBE) {
            return subscribe(clientRequest);
        }
        if (apiKeys == ApiKeys.OFFSETRESET) {
            return seek(clientRequest);
        }
        if (apiKeys == ApiKeys.UNSUBSCRIBE) {
            return unsubscribe(clientRequest);
        }
        return null;
    }

    public ClientResponse receive(ClientRequest clientRequest) {
        Node destination = clientRequest.destination();
        FetchRequest build = ((FetchRequest.Builder) clientRequest.requestBuilder()).build();
        String str = build.topic();
        long pollTimeout = build.pollTimeout();
        try {
            if (!this.topicConsumersMap.containsKey(destination)) {
                this.topicConsumersMap.put(destination, new TopicConsumers(this, destination));
            }
            Message[] bulkReceive = this.topicConsumersMap.get(destination).getTopicSubscriber(str).bulkReceive(this.configs.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG).intValue(), pollTimeout);
            if (bulkReceive == null) {
                return createFetchResponse(clientRequest, str, Collections.emptyList(), false);
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < bulkReceive.length; i++) {
                if (bulkReceive[i] instanceof AQjmsBytesMessage) {
                    arrayList.add((AQjmsBytesMessage) bulkReceive[i]);
                } else {
                    try {
                        this.log.error("Message is not an instance of AQjmsBytesMessage: Topic {} partition {} offset{}", new Object[]{str, Integer.valueOf(bulkReceive[i].getIntProperty("partition")), Long.valueOf(MessageIdConverter.getOffset(bulkReceive[i].getJMSMessageID()))});
                    } catch (JMSException e) {
                    }
                }
            }
            return createFetchResponse(clientRequest, str, arrayList, false);
        } catch (Exception e2) {
            close(destination);
            return createFetchResponse(clientRequest, str, Collections.emptyList(), true);
        } catch (JMSException e3) {
            if (!e3.getErrorCode().equals("120")) {
                close(destination);
                this.log.error("failed to receive messages from topic: {}", str);
            }
            return createFetchResponse(clientRequest, str, Collections.emptyList(), true);
        }
    }

    private ClientResponse createFetchResponse(ClientRequest clientRequest, String str, List<AQjmsBytesMessage> list, boolean z) {
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, new FetchResponse(str, list));
    }

    public ClientResponse commit(ClientRequest clientRequest) {
        CommitRequest build = ((CommitRequest.Builder) clientRequest.requestBuilder()).build();
        Map<Node, List<TopicPartition>> nodes = build.nodes();
        Map<TopicPartition, OffsetAndMetadata> offsets = build.offsets();
        HashMap hashMap = new HashMap();
        boolean z = false;
        for (Map.Entry<Node, List<TopicPartition>> entry : nodes.entrySet()) {
            if (entry.getValue().size() > 0) {
                try {
                    this.topicConsumersMap.get(entry.getKey()).getSession().commit();
                    hashMap.put(entry.getKey(), null);
                } catch (JMSException e) {
                    z = true;
                    hashMap.put(entry.getKey(), e);
                }
            }
        }
        return createCommitResponse(clientRequest, nodes, offsets, hashMap, z);
    }

    private ClientResponse createCommitResponse(ClientRequest clientRequest, Map<Node, List<TopicPartition>> map, Map<TopicPartition, OffsetAndMetadata> map2, Map<Node, Exception> map3, boolean z) {
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), false, new CommitResponse(map3, map, map2, z));
    }

    private String getMsgIdFormat(Connection connection, String str) {
        String str2 = "66";
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement("select msgid from " + Utils.enquote(str) + " where rownum = ?");
            preparedStatement.setInt(1, 1);
            ResultSet executeQuery = preparedStatement.executeQuery();
            if (executeQuery.next()) {
                str2 = executeQuery.getString(1).substring(26, 28);
            }
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (Exception e) {
                }
            }
        } catch (Exception e2) {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (Exception e3) {
                }
            }
        } catch (Throwable th) {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (Exception e4) {
                    throw th;
                }
            }
            throw th;
        }
        return str2;
    }

    private static void validateMsgId(String str) throws IllegalArgumentException {
        if (str == null || str.length() != 32) {
            throw new IllegalArgumentException("Invalid Message Id " + (str == null ? "null" : str));
        }
        for (int i = 0; i < 32; i += 4) {
            try {
                Long.parseLong(str.substring(i, i + 4), 16);
            } catch (Exception e) {
                throw new IllegalArgumentException("Invalid Mesage Id " + str);
            }
        }
    }

    public ClientResponse seek(ClientRequest clientRequest) {
        HashMap hashMap = new HashMap();
        CallableStatement callableStatement = null;
        try {
            OffsetResetRequest build = ((OffsetResetRequest.Builder) clientRequest.requestBuilder()).build();
            Node destination = clientRequest.destination();
            Map<TopicPartition, Long> offsetResetTimestamps = build.offsetResetTimestamps();
            HashMap hashMap2 = new HashMap();
            for (Map.Entry<TopicPartition, Long> entry : offsetResetTimestamps.entrySet()) {
                String str = entry.getKey().topic();
                if (!hashMap2.containsKey(str)) {
                    hashMap2.put(str, new HashMap());
                }
                ((Map) hashMap2.get(str)).put(entry.getKey(), entry.getValue());
            }
            Connection dBConnection = this.topicConsumersMap.get(destination).getSession().getDBConnection();
            String[] strArr = new String[5];
            for (Map.Entry entry2 : hashMap2.entrySet()) {
                String str2 = (String) entry2.getKey();
                strArr[0] = "Topic: " + str2 + " ";
                try {
                    try {
                        if (this.msgIdFormat.equals("00")) {
                            this.msgIdFormat = getMsgIdFormat(dBConnection, str2);
                        }
                        SeekInput[] seekInputArr = new SeekInput[((Map) entry2.getValue()).entrySet().size()];
                        for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                            seekInputArr[0] = new SeekInput();
                            try {
                                TopicPartition topicPartition = (TopicPartition) entry3.getKey();
                                seekInputArr[0].partition = topicPartition.partition();
                                strArr[1] = "Partrition: " + seekInputArr[0].partition;
                                if (((Long) entry3.getValue()).longValue() == -2) {
                                    seekInputArr[0].seekType = 1;
                                    strArr[2] = "Seek Type: " + seekInputArr[0].seekType;
                                } else if (((Long) entry3.getValue()).longValue() == -1) {
                                    seekInputArr[0].seekType = 2;
                                    strArr[2] = "Seek Type: " + seekInputArr[0].seekType;
                                } else {
                                    seekInputArr[0].seekType = 3;
                                    strArr[2] = "Seek Type: " + seekInputArr[0].seekType;
                                    strArr[3] = "Seek to Offset: " + entry3.getValue();
                                    seekInputArr[0].seekMsgId = MessageIdConverter.getMsgId(topicPartition, ((Long) entry3.getValue()).longValue(), this.msgIdFormat);
                                    strArr[4] = "Seek To MsgId: " + seekInputArr[0].seekMsgId;
                                    validateMsgId(seekInputArr[0].seekMsgId);
                                }
                            } catch (IllegalArgumentException e) {
                                String str3 = OffsetAndMetadata.NO_METADATA;
                                for (int i = 0; i < strArr.length && strArr[i] != null; i++) {
                                    str3 = str3 + strArr[i];
                                }
                                throw new IllegalArgumentException(str3, e);
                                break;
                            }
                        }
                        StringBuilder sb = new StringBuilder();
                        sb.append("declare \n seek_output_array  dbms_aq.seek_output_array_t; \n ");
                        sb.append("begin \n dbms_aq.seek(queue_name => ?,");
                        sb.append("consumer_name=>?,");
                        sb.append("seek_input_array =>  dbms_aq.seek_input_array_t( ");
                        for (int i2 = 0; i2 < seekInputArr.length; i2++) {
                            sb.append("dbms_aq.seek_input_t( shard=> ?, priority => ?, seek_type => ?, seek_pos => dbms_aq.seek_pos_t( msgid => hextoraw(?))) ");
                            if (i2 != seekInputArr.length - 1) {
                                sb.append(", ");
                            }
                        }
                        sb.append("), ");
                        sb.append("skip_option => ?, seek_output_array => seek_output_array);\n");
                        sb.append("end;\n");
                        callableStatement = dBConnection.prepareCall(sb.toString());
                        int i3 = 1 + 1;
                        callableStatement.setString(1, Utils.enquote(str2));
                        int i4 = i3 + 1;
                        callableStatement.setString(i3, this.configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
                        for (int i5 = 0; i5 < seekInputArr.length; i5++) {
                            int i6 = i4;
                            int i7 = i4 + 1;
                            callableStatement.setInt(i6, seekInputArr[i5].partition);
                            int i8 = i7 + 1;
                            callableStatement.setInt(i7, seekInputArr[i5].priority);
                            int i9 = i8 + 1;
                            callableStatement.setInt(i8, seekInputArr[i5].seekType);
                            if (seekInputArr[i5].seekType == 3) {
                                i4 = i9 + 1;
                                callableStatement.setString(i9, seekInputArr[i5].seekMsgId);
                            } else {
                                i4 = i9 + 1;
                                callableStatement.setNull(i9, 1);
                            }
                        }
                        int i10 = i4;
                        int i11 = i4 + 1;
                        callableStatement.setInt(i10, 2);
                        callableStatement.execute();
                        if (callableStatement != null) {
                            try {
                                callableStatement.close();
                                callableStatement = null;
                            } catch (Exception e2) {
                            }
                        }
                    } catch (Exception e3) {
                        Iterator it = ((Map) entry2.getValue()).entrySet().iterator();
                        while (it.hasNext()) {
                            hashMap.put((TopicPartition) ((Map.Entry) it.next()).getKey(), e3);
                        }
                        if (callableStatement != null) {
                            try {
                                callableStatement.close();
                                callableStatement = null;
                            } catch (Exception e4) {
                            }
                        }
                    }
                } finally {
                }
            }
            return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), false, new OffsetResetResponse(hashMap, null));
        } catch (Exception e5) {
            return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), true, new OffsetResetResponse(hashMap, e5));
        }
    }

    private ClientResponse unsubscribe(ClientRequest clientRequest) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Node, TopicConsumers> entry : this.topicConsumersMap.entrySet()) {
            for (Map.Entry<String, TopicSubscriber> entry2 : entry.getValue().getTopicSubscriberMap().entrySet()) {
                try {
                    entry2.getValue().close();
                    hashMap.put(entry2.getKey(), null);
                    entry.getValue().remove(entry2.getKey());
                } catch (JMSException e) {
                    hashMap.put(entry2.getKey(), e);
                }
            }
            try {
                entry.getValue().getSession().close();
                entry.getValue().setSession(null);
            } catch (JMSException e2) {
            }
        }
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), false, new UnsubscribeResponse(hashMap));
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void connect(Node node) throws JMSException {
        if (this.topicConsumersMap.containsKey(node)) {
            return;
        }
        TopicConsumers topicConsumers = null;
        try {
            topicConsumers = new TopicConsumers(this, node);
            this.topicConsumersMap.put(node, topicConsumers);
        } catch (JMSException e) {
            close(node, topicConsumers);
            throw e;
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public boolean isChannelReady(Node node) {
        return this.topicConsumersMap.containsKey(node);
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close(Node node) {
        if (this.topicConsumersMap.get(node) == null) {
            return;
        }
        close(node, this.topicConsumersMap.get(node));
        this.topicConsumersMap.remove(node);
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close() {
        this.log.trace("Closing AQ kafka consumer");
        for (Map.Entry<Node, TopicConsumers> entry : this.topicConsumersMap.entrySet()) {
            close(entry.getKey(), entry.getValue());
        }
        this.log.trace("Closed AQ kafka consumer");
        this.topicConsumersMap.clear();
    }

    private void close(Node node, TopicConsumers topicConsumers) {
        if (node == null || topicConsumers == null) {
            return;
        }
        for (Map.Entry<String, TopicSubscriber> entry : topicConsumers.getTopicSubscriberMap().entrySet()) {
            try {
                entry.getValue().close();
            } catch (JMSException e) {
                this.log.error("Failed to close topic consumer for topic: {} ", entry.getKey());
            }
        }
        try {
            topicConsumers.getSession().close();
        } catch (JMSException e2) {
            this.log.error("Failed to close session: {} associated with connection: {} and node: {}  ", new Object[]{topicConsumers.getSession(), topicConsumers.getConnection(), node});
        }
        try {
            topicConsumers.getConnection().close();
        } catch (JMSException e3) {
            this.log.error("Failed to close connection: {} associated with node: {}  ", topicConsumers.getConnection(), node);
        }
        this.topicConsumersMap.remove(node);
    }

    private void createTopicConnection(Node node) throws JMSException {
        createTopicConnection(node, 1);
    }

    private void createTopicConnection(Node node, int i) throws JMSException {
        if (this.topicConsumersMap.containsKey(node)) {
            return;
        }
        this.topicConsumersMap.put(node, new TopicConsumers(node, i));
    }

    public ClientResponse subscribe(ClientRequest clientRequest) {
        for (Map.Entry<Node, TopicConsumers> entry : this.topicConsumersMap.entrySet()) {
            for (Map.Entry<String, TopicSubscriber> entry2 : entry.getValue().getTopicSubscriberMap().entrySet()) {
                try {
                    entry2.getValue().close();
                    entry.getValue().remove(entry2.getKey());
                } catch (JMSException e) {
                }
            }
            try {
                entry.getValue().getSession().close();
                entry.getValue().setSession(null);
            } catch (JMSException e2) {
            }
        }
        String topic = ((SubscribeRequest.Builder) clientRequest.requestBuilder()).build().getTopic();
        Node destination = clientRequest.destination();
        try {
            if (!this.topicConsumersMap.containsKey(destination)) {
                this.topicConsumersMap.put(destination, new TopicConsumers(this, destination));
            }
            this.topicConsumersMap.get(destination).getTopicSubscriber(topic);
            return createSubscribeResponse(clientRequest, topic, null, false);
        } catch (JMSException e3) {
            close(destination);
            return createSubscribeResponse(clientRequest, topic, e3, false);
        }
    }

    private ClientResponse createSubscribeResponse(ClientRequest clientRequest, String str, JMSException jMSException, boolean z) {
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, new SubscribeResponse(str, jMSException));
    }
}
